From ac9995fb1ade37a3865d5725100a49ea5e235cc5 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 23 Mar 2023 13:11:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96sncp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/Response.java | 6 +-- .../org/redkale/net/sncp/SncpRemoteInfo.java | 38 ++++++------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 3b67357e9..4de1440ed 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -387,7 +387,7 @@ public abstract class Response> { boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array); if (allCompleted) { request.pipelineCompleted = true; - this.channel.writePipeline(buffers, this.finishBuffersIOThreadHandler); + this.channel.writeInIOThread(buffers, buffers, this.finishBuffersIOThreadHandler); } else { AsyncConnection conn = removeChannel(); if (conn != null) { @@ -397,7 +397,7 @@ public abstract class Response> { } } else if (this.channel.hasPipelineData()) { //先将pipeline数据写入完再写入buffers - this.channel.writePipeline(null, new CompletionHandler() { + this.channel.writePipelineInIOThread(new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { @@ -410,7 +410,7 @@ public abstract class Response> { } }); } else { - this.channel.write(buffers, buffers, finishBuffersIOThreadHandler); + this.channel.writeInIOThread(buffers, buffers, finishBuffersIOThreadHandler); } } diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index b4c729254..e5a88ba09 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -200,10 +200,16 @@ public class SncpRemoteInfo { //Client模式RPC protected CompletableFuture remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) { final SncpClient client = this.sncpClient; + final SncpClientRequest request = createSncpClientRequest(action, client.clientSncpAddress, traceid, params); + final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress(); + return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent())); + } + + protected SncpClientRequest createSncpClientRequest(SncpRemoteAction action, InetSocketAddress clientSncpAddress, String traceid, Object[] params) { final Type[] myParamTypes = action.paramTypes; final Class[] myParamClass = action.paramClasses; if (action.paramAddressSourceIndex >= 0) { - params[action.paramAddressSourceIndex] = client.clientSncpAddress; + params[action.paramAddressSourceIndex] = clientSncpAddress; } byte[] body = null; if (myParamTypes.length > 0) { @@ -215,31 +221,8 @@ public class SncpRemoteInfo { convert.offerWriter(writer); } final SncpClientRequest request = new SncpClientRequest(); - request.prepare(action.header, client.nextSeqno(), traceid, body); - - final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress(); - return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent())); - } - - protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) { - final Type[] myParamTypes = action.paramTypes; - final Class[] myParamClass = action.paramClasses; - if (action.paramAddressSourceIndex >= 0) { - params[action.paramAddressSourceIndex] = clientSncpAddress; - } - final long seqid = System.nanoTime(); - byte[] body = null; - if (myParamTypes.length > 0) { - Writer writer = convert.pollWriter(); - for (int i = 0; i < params.length; i++) { //service方法的参数 - convert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]); - } - body = ((ByteTuple) writer).toArray(); - convert.offerWriter(writer); - } - final SncpClientRequest requet = new SncpClientRequest(); - requet.prepare(action.header, seqid, traceid, body); - return requet; + request.prepare(action.header, this.sncpClient.nextSeqno(), traceid, body); + return request; } protected InetSocketAddress nextRemoteAddress() { @@ -259,7 +242,8 @@ public class SncpRemoteInfo { @Override public String toString() { InetSocketAddress clientSncpAddress = sncpClient == null ? null : sncpClient.getClientSncpAddress(); - return this.getClass().getSimpleName() + "(service = " + serviceType.getSimpleName() + ", serviceid = " + serviceid + ", serviceVersion = " + serviceVersion + ", name = '" + name + return this.getClass().getSimpleName() + "(service = " + serviceType.getSimpleName() + ", serviceid = " + serviceid + + ", serviceVersion = " + serviceVersion + ", name = '" + name + "', address = " + (clientSncpAddress == null ? "" : (clientSncpAddress.getHostString() + ":" + clientSncpAddress.getPort())) + ", actions.size = " + actions.length + ")"; }