优化client

This commit is contained in:
redkale
2023-02-02 15:02:43 +08:00
parent cfc20c6248
commit 2ef5eb6cd0
5 changed files with 124 additions and 144 deletions

View File

@@ -40,8 +40,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
this.connection = connection; this.connection = connection;
} }
//返回true: array会clear, 返回false: buffer会clear public abstract void decodeMessages(ByteBuffer buffer, ByteArray array);
public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array);
@Override @Override
public final void completed(Integer count, ByteBuffer attachment) { public final void completed(Integer count, ByteBuffer attachment) {
@@ -63,15 +62,20 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
private void decodeResponse(ByteBuffer buffer) { private void decodeResponse(ByteBuffer buffer) {
AsyncConnection channel = connection.channel; AsyncConnection channel = connection.channel;
connection.currRespIterator = null; connection.currRespIterator = null;
if (decodeMessages(buffer, readArray)) { //成功了 decodeMessages(buffer, readArray);
if (!respResults.isEmpty()) { //存在解析结果
connection.currRespIterator = null; connection.currRespIterator = null;
readArray.clear(); readArray.clear();
for (ClientResponse<R, P> cr : respResults) { for (ClientResponse<R, P> cr : respResults) {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid()); if (cr.isError()) {
if (respFuture != null) { connection.dispose(null);
responseComplete(respFuture, cr.message, cr.exc); } else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
if (respFuture != null) {
responseComplete(respFuture, cr.message, cr.exc);
}
respPool.accept(cr);
} }
respPool.accept(cr);
} }
respResults.clear(); respResults.clear();
@@ -95,7 +99,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
R request = respFuture.request; R request = respFuture.request;
WorkThread workThread = null; WorkThread workThread = null;
try { try {
if (!request.isCompleted()) { if (request != null && !request.isCompleted()) {
if (exc == null) { if (exc == null) {
connection.sendHalfWrite(exc); connection.sendHalfWrite(exc);
//request没有发送完respFuture需要再次接收 //request没有发送完respFuture需要再次接收
@@ -104,8 +108,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.sendHalfWrite(exc); connection.sendHalfWrite(exc);
} }
} }
workThread = request.workThread; if (request != null) {
request.workThread = null; workThread = request.workThread;
request.workThread = null;
}
connection.respWaitingCounter.decrement(); connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) { if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter(); connection.client.incrRespDoneCounter();
@@ -119,13 +125,17 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} }
if (exc != null) { if (exc != null) {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currTraceid(request.traceid); if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
} else { } else {
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message);
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currTraceid(request.traceid); if (request != null) {
Traces.currTraceid(request.traceid);
}
((ClientFuture) respFuture).complete(rs); ((ClientFuture) respFuture).complete(rs);
}); });
} }
@@ -169,6 +179,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
this.respResults.add(respPool.get().set(request, exc)); this.respResults.add(respPool.get().set(request, exc));
} }
public void occurError(R request, Throwable exc) {
this.respResults.add(new ClientResponse.ClientErrorResponse<>(request, exc));
}
@Override @Override
public String toString() { public String toString() {
return JsonConvert.root().convertTo(this); return JsonConvert.root().convertTo(this);

View File

@@ -21,7 +21,7 @@ import java.io.Serializable;
*/ */
public class ClientResponse<R extends ClientRequest, P> { public class ClientResponse<R extends ClientRequest, P> {
protected R request; protected R request; //服务端返回一个不存在的requestid可能为null
protected P message; protected P message;
@@ -101,4 +101,19 @@ public class ClientResponse<R extends ClientRequest, P> {
return "{\"message\":" + message + "}"; return "{\"message\":" + message + "}";
} }
boolean isError() {
return false;
}
static class ClientErrorResponse<R extends ClientRequest, P> extends ClientResponse<R, P> {
public ClientErrorResponse(R request, Throwable exc) {
super(request, exc);
}
@Override
boolean isError() {
return true;
}
}
} }

View File

@@ -38,21 +38,21 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
} }
@Override @Override
public boolean decodeMessages(ByteBuffer realBuf, ByteArray array) { public void decodeMessages(ByteBuffer realBuf, ByteArray array) {
SncpClientConnection conn = (SncpClientConnection) connection;
ByteBuffer buffer = realBuf; ByteBuffer buffer = realBuf;
boolean hadResult = false;
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
if (halfHeaderBytes != null) { if (halfHeaderBytes != null) {
if (buffer.remaining() + halfHeaderBytes.length() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header if (buffer.remaining() + halfHeaderBytes.length() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header
halfHeaderBytes.put(buffer); halfHeaderBytes.put(buffer);
return hadResult; return;
} }
halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length()); halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length());
//读取完整header //读取完整header
SncpClientResult result = new SncpClientResult(); SncpClientResult result = new SncpClientResult();
result.readHeader(halfHeaderBytes); if (!result.readHeader(halfHeaderBytes)) {
occurError(null, null); //request不一定存在
return;
}
halfHeaderBytes = null; halfHeaderBytes = null;
if (result.getBodyLength() < 1) { if (result.getBodyLength() < 1) {
addMessage(findRequest(result.getRequestid()), result); addMessage(findRequest(result.getRequestid()), result);
@@ -62,11 +62,11 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
//还需要读body //还需要读body
lastResult = result; lastResult = result;
} }
if (lastResult != null) { //buffer不够 if (lastResult != null) { //lastResult的body没有读完
if (halfBodyBytes != null) { if (halfBodyBytes != null) {
if (buffer.remaining() + halfBodyBytes.length() < lastResult.getBodyLength()) { //buffer不足以读取完整body if (buffer.remaining() + halfBodyBytes.length() < lastResult.getBodyLength()) { //buffer不足以读取完整body
halfBodyBytes.put(buffer); halfBodyBytes.put(buffer);
return hadResult; return;
} }
halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfHeaderBytes.length()); halfBodyBytes.put(buffer, lastResult.getBodyLength() - halfHeaderBytes.length());
//读取完整body //读取完整body
@@ -76,18 +76,42 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
lastResult = null; lastResult = null;
continue; continue;
} }
if (buffer.remaining() < lastResult.getBodyLength()) { //buffer不足以读取完整body
halfBodyBytes = pollArray();
halfBodyBytes.put(buffer);
return;
}
lastResult.readBody(buffer);
halfBodyBytes = null;
addMessage(findRequest(lastResult.getRequestid()), lastResult);
lastResult = null;
continue;
} }
if (buffer.remaining() < SncpHeader.HEADER_SIZE) { //内容不足以读取完整header if (buffer.remaining() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header
halfHeaderBytes = pollArray(); halfHeaderBytes = pollArray();
halfHeaderBytes.put(buffer); halfHeaderBytes.put(buffer);
return hadResult; return;
} }
SncpClientResult result = new SncpClientResult();
SncpClientRequest request = null; if (!result.readHeader(buffer)) {
buffer = realBuf; occurError(null, null); //request不一定存在
return;
}
if (result.getBodyLength() < 1) {
addMessage(findRequest(result.getRequestid()), result);
lastResult = null;
continue;
}
if (buffer.remaining() < result.getBodyLength()) { //buffer不足以读取完整body
lastResult = result;
halfBodyBytes = pollArray();
halfBodyBytes.put(buffer);
return;
}
result.readBody(buffer);
addMessage(findRequest(result.getRequestid()), result);
lastResult = null;
} }
return hadResult;
} }
} }

View File

@@ -5,7 +5,7 @@ package org.redkale.net.sncp;
import java.io.Serializable; import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.redkale.util.*; import org.redkale.util.ByteArray;
/** /**
* *
@@ -13,116 +13,41 @@ import org.redkale.util.*;
*/ */
public class SncpClientResult { public class SncpClientResult {
private long seqid; private SncpHeader header;
private Uint128 serviceid;
private int serviceVersion;
private Uint128 actionid;
private byte[] addrBytes;
private int addrPort;
private int bodyLength;
private byte[] bodyContent; private byte[] bodyContent;
private int retcode; protected boolean readHeader(ByteBuffer buffer) {
SncpHeader h = new SncpHeader();
protected void readHeader(ByteBuffer buffer) { boolean rs = h.read(buffer);
this.seqid = buffer.getLong(); //8 this.header = h;
buffer.getChar(); //HEADER_SIZE 2 return rs;
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
this.addrBytes = new byte[4];
buffer.get(this.addrBytes); //addr 4
this.addrPort = buffer.getChar(); //port 2
this.bodyLength = buffer.getInt(); //4
this.retcode = buffer.getInt(); //4
} }
protected void readHeader(ByteArray array) { protected boolean readHeader(ByteArray array) {
int offset = 0; SncpHeader h = new SncpHeader();
this.seqid = array.getLong(offset); //8 boolean rs = h.read(array);
offset += 8; this.header = h;
array.getChar(offset); //HEADER_SIZE 2 return rs;
offset += 2; }
this.serviceid = array.getUint128(offset); //16
offset += 16; protected boolean readBody(ByteBuffer buffer) {
this.serviceVersion = array.getInt(offset); //4 byte[] body = new byte[header.getBodyLength()];
offset += 4; buffer.get(body);
this.actionid = array.getUint128(offset); //16 this.bodyContent = body;
offset += 16; return true;
this.addrBytes = array.getBytes(offset, 4); //addr 4
offset += 4;
this.addrPort = array.getChar(offset); //port 2
offset += 2;
this.bodyLength = array.getInt(offset); //4
offset += 4;
this.retcode = array.getInt(offset); //4
} }
public Serializable getRequestid() { public Serializable getRequestid() {
return seqid; return header == null ? null : header.getSeqid();
}
public long getSeqid() {
return seqid;
}
public void setSeqid(long seqid) {
this.seqid = seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public void setServiceid(Uint128 serviceid) {
this.serviceid = serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(int serviceVersion) {
this.serviceVersion = serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public void setActionid(Uint128 actionid) {
this.actionid = actionid;
}
public byte[] getAddrBytes() {
return addrBytes;
}
public void setAddrBytes(byte[] addrBytes) {
this.addrBytes = addrBytes;
}
public int getAddrPort() {
return addrPort;
}
public void setAddrPort(int addrPort) {
this.addrPort = addrPort;
} }
public int getBodyLength() { public int getBodyLength() {
return bodyLength; return header.getBodyLength();
} }
public void setBodyLength(int bodyLength) { public SncpHeader getHeader() {
this.bodyLength = bodyLength; return header;
} }
public byte[] getBodyContent() { public byte[] getBodyContent() {
@@ -133,12 +58,4 @@ public class SncpClientResult {
this.bodyContent = bodyContent; this.bodyContent = bodyContent;
} }
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
} }

View File

@@ -5,6 +5,7 @@ package org.redkale.net.sncp;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Objects;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
@@ -34,6 +35,10 @@ public class SncpHeader {
private int retcode; private int retcode;
private long timestamp; //待加入 + 8
private boolean valid;
public SncpHeader() { public SncpHeader() {
} }
@@ -51,9 +56,7 @@ public class SncpHeader {
public boolean read(ByteBuffer buffer) { public boolean read(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8 this.seqid = buffer.getLong(); //8
if (buffer.getChar() != HEADER_SIZE) { //HEADER_SIZE 2 this.valid = buffer.getChar() != HEADER_SIZE; //2
return false;
}
this.serviceid = Uint128.read(buffer); //16 this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4 this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16 this.actionid = Uint128.read(buffer); //16
@@ -62,16 +65,14 @@ public class SncpHeader {
this.addrPort = buffer.getChar(); //port 2 this.addrPort = buffer.getChar(); //port 2
this.bodyLength = buffer.getInt(); //4 this.bodyLength = buffer.getInt(); //4
this.retcode = buffer.getInt(); //4 this.retcode = buffer.getInt(); //4
return true; return this.valid;
} }
public boolean readHeader(ByteArray array) { public boolean read(ByteArray array) {
int offset = 0; int offset = 0;
this.seqid = array.getLong(offset); //8 this.seqid = array.getLong(offset); //8
offset += 8; offset += 8;
if (array.getChar(offset) != HEADER_SIZE) { //HEADER_SIZE 2 this.valid = array.getChar(offset) != HEADER_SIZE; //2
return false;
}
offset += 2; offset += 2;
this.serviceid = array.getUint128(offset); //16 this.serviceid = array.getUint128(offset); //16
offset += 16; offset += 16;
@@ -86,7 +87,7 @@ public class SncpHeader {
this.bodyLength = array.getInt(offset); //4 this.bodyLength = array.getInt(offset); //4
offset += 4; offset += 4;
this.retcode = array.getInt(offset); //4 this.retcode = array.getInt(offset); //4
return true; return this.valid;
} }
public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) { public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {
@@ -137,6 +138,15 @@ public class SncpHeader {
return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), addrPort); return new InetSocketAddress((0xff & addrBytes[0]) + "." + (0xff & addrBytes[1]) + "." + (0xff & addrBytes[2]) + "." + (0xff & addrBytes[3]), addrPort);
} }
public boolean isValid() {
return valid;
}
//供client端request和response的header判断
public boolean checkValid(SncpHeader other) {
return this.seqid == other.seqid && Objects.equals(this.serviceid, other.serviceid) && Objects.equals(this.actionid, other.actionid);
}
public long getSeqid() { public long getSeqid() {
return seqid; return seqid;
} }