优化SncpResponse.fillRespHeader

This commit is contained in:
Redkale
2022-12-18 15:51:47 +08:00
parent 1334e2439d
commit a31e47ea42
3 changed files with 27 additions and 65 deletions

View File

@@ -19,6 +19,7 @@ import org.redkale.convert.json.*;
import org.redkale.mq.*;
import org.redkale.net.*;
import static org.redkale.net.sncp.SncpRequest.*;
import static org.redkale.net.sncp.SncpResponse.fillRespHeader;
import org.redkale.service.*;
import org.redkale.util.*;
import org.redkale.service.RpcCall;
@@ -154,11 +155,11 @@ public final class SncpClient {
}
public String toSimpleString() { //给Sncp产生的Service用
if(DataSource.class.isAssignableFrom(serviceClass)||CacheSource.class.isAssignableFrom(serviceClass)){
String service = serviceClass.getAnnotation(SncpDyn.class) ==null? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")";
if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) {
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")";
}
String service = serviceClass.getAnnotation(SncpDyn.class) ==null? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName();
if (remote) service = service.replace("DynLocalService", "DynRemoteService");
return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion
+ ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort()))
@@ -262,6 +263,7 @@ public final class SncpClient {
}
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final String traceid = Traces.currTraceid();
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress;
@@ -281,7 +283,7 @@ public final class SncpClient {
final DLong actionid = action.actionid;
if (messageAgent != null) { //MQ模式
final ByteArray reqbytes = writer.toByteArray();
fillHeader(reqbytes, seqid, actionid, reqBodyLength);
fillHeader(reqbytes, seqid, actionid, traceid, reqBodyLength);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) targetTopic = this.topic;
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes());
@@ -325,7 +327,7 @@ public final class SncpClient {
}
final AsyncConnection conn = conn0;
final ByteArray array = writer.toByteArray();
fillHeader(array, seqid, actionid, reqBodyLength);
fillHeader(array, seqid, actionid, traceid, reqBodyLength);
conn.write(array, new CompletionHandler<Integer, Void>() {
@@ -468,44 +470,11 @@ public final class SncpClient {
buffer.getChar(); //端口
}
private void fillHeader(ByteArray buffer, long seqid, DLong actionid, int bodyLength) {
//---------------------head----------------------------------
int offset = 0;
buffer.putLong(offset, seqid); //序列号
offset += 8;
buffer.putChar(offset, (char) HEADER_SIZE); //header长度
offset += 2;
DLong.write(buffer, offset, this.serviceid);
offset += 16;
buffer.putInt(offset, this.serviceVersion);
offset += 4;
DLong.write(buffer, offset, actionid);
offset += 16;
buffer.put(offset, addrBytes);
offset += addrBytes.length; //4
buffer.putChar(offset, (char) this.addrPort);
offset += 2;
buffer.putInt(offset, bodyLength); //body长度
offset += 4;
buffer.putInt(offset, 0); //结果码, 请求方固定传0
//offset += 4;
private void fillHeader(ByteArray buffer, long seqid, DLong actionid, String traceid, int bodyLength) {
fillRespHeader(buffer, seqid, this.serviceid, this.serviceVersion,
actionid, traceid, this.addrBytes, this.addrPort, bodyLength, 0); //结果码, 请求方固定传0
}
// private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength) {
// //---------------------head----------------------------------
// final int currentpos = buffer.position();
// buffer.position(0);
// buffer.putLong(seqid); //序列号
// buffer.putChar((char) HEADER_SIZE); //header长度
// DLong.write(buffer, this.serviceid);
// buffer.putInt(this.serviceVersion);
// DLong.write(buffer, actionid);
// buffer.put(addrBytes);
// buffer.putChar((char) this.addrPort);
// buffer.putInt(bodyLength); //body长度
// buffer.putInt(0); //结果码, 请求方固定传0
// buffer.position(currentpos);
// }
protected static final class SncpAction {
protected final DLong actionid;

View File

@@ -61,7 +61,7 @@ public class SncpRequest extends Request<SncpContext> {
this.convert = context.getBsonConvert();
}
@Override
@Override //request.header与response.header数据格式保持一致
protected int readHeader(ByteBuffer buffer, Request last) {
if (buffer.remaining() == Sncp.PING_BUFFER.remaining()) {
if (buffer.hasRemaining()) buffer.get(new byte[buffer.remaining()]);

View File

@@ -44,7 +44,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
super(context, request);
this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");
if (this.addrBytes.length != 4) {
throw new RuntimeException("SNCP serverAddress only support IPv4");
}
}
@Override
@@ -76,41 +78,32 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
}
protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) {
fillRespHeader(buffer, request.getSeqid(), request.getServiceid(), request.getServiceVersion(),
request.getActionid(), request.getTraceid(), this.addrBytes, this.addrPort, bodyLength, retcode);
}
protected static void fillRespHeader(ByteArray buffer, long seqid, DLong serviceid, int serviceVersion,
DLong actionid, String traceid, byte[] addrBytes, int addrPort, int bodyLength, int retcode) {
//---------------------head----------------------------------
int offset = 0;
buffer.putLong(offset, request.getSeqid());
buffer.putLong(offset, seqid);
offset += 8;
buffer.putChar(offset, (char) SncpRequest.HEADER_SIZE);
offset += 2;
DLong.write(buffer, offset, request.getServiceid());
DLong.write(buffer, offset, serviceid);
offset += 16;
buffer.putInt(offset, request.getServiceVersion());
buffer.putInt(offset, serviceVersion);
offset += 4;
DLong.write(buffer, offset, request.getActionid());
DLong.write(buffer, offset, actionid);
offset += 16;
buffer.put(offset, addrBytes);
offset += addrBytes.length; //4
buffer.putChar(offset, (char) this.addrPort);
buffer.putChar(offset, (char) addrPort);
offset += 2;
buffer.putInt(offset, bodyLength);
offset += 4;
buffer.putInt(offset, retcode);
//offset += 4;
offset += 4;
}
// protected void fillHeader(ByteBuffer buffer, int bodyLength, int retcode) {
// //---------------------head----------------------------------
// final int currentpos = buffer.position();
// buffer.position(0);
// buffer.putLong(request.getSeqid());
// buffer.putChar((char) SncpRequest.HEADER_SIZE);
// DLong.write(buffer, request.getServiceid());
// buffer.putInt(request.getServiceVersion());
// DLong.write(buffer, request.getActionid());
// buffer.put(addrBytes);
// buffer.putChar((char) this.addrPort);
// buffer.putInt(bodyLength);
// buffer.putInt(retcode);
// buffer.position(currentpos);
// }
}