From a31e47ea42449acf2a8e0fc7786bf773831671b2 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sun, 18 Dec 2022 15:51:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96SncpResponse.fillRespHeader?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/sncp/SncpClient.java | 53 ++++--------------- .../org/redkale/net/sncp/SncpRequest.java | 2 +- .../org/redkale/net/sncp/SncpResponse.java | 37 ++++++------- 3 files changed, 27 insertions(+), 65 deletions(-) diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 745f55115..50709c50a 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -19,6 +19,7 @@ import org.redkale.convert.json.*; import org.redkale.mq.*; import org.redkale.net.*; import static org.redkale.net.sncp.SncpRequest.*; +import static org.redkale.net.sncp.SncpResponse.fillRespHeader; import org.redkale.service.*; import org.redkale.util.*; import org.redkale.service.RpcCall; @@ -154,11 +155,11 @@ public final class SncpClient { } public String toSimpleString() { //给Sncp产生的Service用 - if(DataSource.class.isAssignableFrom(serviceClass)||CacheSource.class.isAssignableFrom(serviceClass)){ - String service = serviceClass.getAnnotation(SncpDyn.class) ==null? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); - return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")"; + if (DataSource.class.isAssignableFrom(serviceClass) || CacheSource.class.isAssignableFrom(serviceClass)) { + String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); + return service + "(serviceid=" + serviceid + ", name='" + name + "', actions.size=" + actions.length + ")"; } - String service = serviceClass.getAnnotation(SncpDyn.class) ==null? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); + String service = serviceClass.getAnnotation(SncpDyn.class) == null ? serviceClass.getName() : serviceClass.getSuperclass().getSimpleName(); if (remote) service = service.replace("DynLocalService", "DynRemoteService"); return service + "(name = '" + name + "', serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", clientaddr = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) @@ -262,6 +263,7 @@ public final class SncpClient { } private CompletableFuture remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + final String traceid = Traces.currTraceid(); final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientSncpAddress; @@ -281,7 +283,7 @@ public final class SncpClient { final DLong actionid = action.actionid; if (messageAgent != null) { //MQ模式 final ByteArray reqbytes = writer.toByteArray(); - fillHeader(reqbytes, seqid, actionid, reqBodyLength); + fillHeader(reqbytes, seqid, actionid, traceid, reqBodyLength); String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; if (targetTopic == null) targetTopic = this.topic; MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes.getBytes()); @@ -325,7 +327,7 @@ public final class SncpClient { } final AsyncConnection conn = conn0; final ByteArray array = writer.toByteArray(); - fillHeader(array, seqid, actionid, reqBodyLength); + fillHeader(array, seqid, actionid, traceid, reqBodyLength); conn.write(array, new CompletionHandler() { @@ -468,44 +470,11 @@ public final class SncpClient { buffer.getChar(); //端口 } - private void fillHeader(ByteArray buffer, long seqid, DLong actionid, int bodyLength) { - //---------------------head---------------------------------- - int offset = 0; - buffer.putLong(offset, seqid); //序列号 - offset += 8; - buffer.putChar(offset, (char) HEADER_SIZE); //header长度 - offset += 2; - DLong.write(buffer, offset, this.serviceid); - offset += 16; - buffer.putInt(offset, this.serviceVersion); - offset += 4; - DLong.write(buffer, offset, actionid); - offset += 16; - buffer.put(offset, addrBytes); - offset += addrBytes.length; //4 - buffer.putChar(offset, (char) this.addrPort); - offset += 2; - buffer.putInt(offset, bodyLength); //body长度 - offset += 4; - buffer.putInt(offset, 0); //结果码, 请求方固定传0 - //offset += 4; + private void fillHeader(ByteArray buffer, long seqid, DLong actionid, String traceid, int bodyLength) { + fillRespHeader(buffer, seqid, this.serviceid, this.serviceVersion, + actionid, traceid, this.addrBytes, this.addrPort, bodyLength, 0); //结果码, 请求方固定传0 } -// private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength) { -// //---------------------head---------------------------------- -// final int currentpos = buffer.position(); -// buffer.position(0); -// buffer.putLong(seqid); //序列号 -// buffer.putChar((char) HEADER_SIZE); //header长度 -// DLong.write(buffer, this.serviceid); -// buffer.putInt(this.serviceVersion); -// DLong.write(buffer, actionid); -// buffer.put(addrBytes); -// buffer.putChar((char) this.addrPort); -// buffer.putInt(bodyLength); //body长度 -// buffer.putInt(0); //结果码, 请求方固定传0 -// buffer.position(currentpos); -// } protected static final class SncpAction { protected final DLong actionid; diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index 7aa08d108..5a22ab11c 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -61,7 +61,7 @@ public class SncpRequest extends Request { this.convert = context.getBsonConvert(); } - @Override + @Override //request.header与response.header数据格式保持一致 protected int readHeader(ByteBuffer buffer, Request last) { if (buffer.remaining() == Sncp.PING_BUFFER.remaining()) { if (buffer.hasRemaining()) buffer.get(new byte[buffer.remaining()]); diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index f7d0fd84a..15636232d 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -44,7 +44,9 @@ public class SncpResponse extends Response { super(context, request); this.addrBytes = context.getServerAddress().getAddress().getAddress(); this.addrPort = context.getServerAddress().getPort(); - if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4"); + if (this.addrBytes.length != 4) { + throw new RuntimeException("SNCP serverAddress only support IPv4"); + } } @Override @@ -76,41 +78,32 @@ public class SncpResponse extends Response { } protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) { + fillRespHeader(buffer, request.getSeqid(), request.getServiceid(), request.getServiceVersion(), + request.getActionid(), request.getTraceid(), this.addrBytes, this.addrPort, bodyLength, retcode); + } + + protected static void fillRespHeader(ByteArray buffer, long seqid, DLong serviceid, int serviceVersion, + DLong actionid, String traceid, byte[] addrBytes, int addrPort, int bodyLength, int retcode) { //---------------------head---------------------------------- int offset = 0; - buffer.putLong(offset, request.getSeqid()); + buffer.putLong(offset, seqid); offset += 8; buffer.putChar(offset, (char) SncpRequest.HEADER_SIZE); offset += 2; - DLong.write(buffer, offset, request.getServiceid()); + DLong.write(buffer, offset, serviceid); offset += 16; - buffer.putInt(offset, request.getServiceVersion()); + buffer.putInt(offset, serviceVersion); offset += 4; - DLong.write(buffer, offset, request.getActionid()); + DLong.write(buffer, offset, actionid); offset += 16; buffer.put(offset, addrBytes); offset += addrBytes.length; //4 - buffer.putChar(offset, (char) this.addrPort); + buffer.putChar(offset, (char) addrPort); offset += 2; buffer.putInt(offset, bodyLength); offset += 4; buffer.putInt(offset, retcode); - //offset += 4; + offset += 4; } -// protected void fillHeader(ByteBuffer buffer, int bodyLength, int retcode) { -// //---------------------head---------------------------------- -// final int currentpos = buffer.position(); -// buffer.position(0); -// buffer.putLong(request.getSeqid()); -// buffer.putChar((char) SncpRequest.HEADER_SIZE); -// DLong.write(buffer, request.getServiceid()); -// buffer.putInt(request.getServiceVersion()); -// DLong.write(buffer, request.getActionid()); -// buffer.put(addrBytes); -// buffer.putChar((char) this.addrPort); -// buffer.putInt(bodyLength); -// buffer.putInt(retcode); -// buffer.position(currentpos); -// } }