From 83d04d02dab4376f67c06df14a7e816d5eb0379d Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 23 Mar 2023 13:03:13 +0800 Subject: [PATCH] =?UTF-8?q?SncpClientRequest.seqno=E7=94=9F=E6=88=90?= =?UTF-8?q?=E8=A7=84=E5=88=99=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/sncp/SncpClient.java | 10 ++++++++++ .../org/redkale/net/sncp/SncpRemoteInfo.java | 20 +++++++++++++++++-- .../java/org/redkale/test/sncp/SncpTest.java | 1 - 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 3c215bc81..b39e7b0e2 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -5,6 +5,7 @@ package org.redkale.net.sncp; import java.net.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import org.redkale.net.*; import org.redkale.net.client.*; @@ -20,11 +21,15 @@ import org.redkale.net.client.*; */ public class SncpClient extends Client { + private final AtomicLong seqno = new AtomicLong(); + final InetSocketAddress clientSncpAddress; public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) { super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns this.clientSncpAddress = clientSncpAddress; + this.readTimeoutSeconds = 12; + this.writeTimeoutSeconds = 12; } @Override @@ -36,6 +41,11 @@ public class SncpClient extends Client connect(SocketAddress addr) { return super.connect(addr); diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index 0a8f802fa..b4c729254 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -200,9 +200,25 @@ public class SncpRemoteInfo { //Client模式RPC protected CompletableFuture remoteClient(final SncpRemoteAction action, final String traceid, final Object[] params) { final SncpClient client = this.sncpClient; - final SncpClientRequest request = createSncpClientRequest(action, client.clientSncpAddress, traceid, params); + final Type[] myParamTypes = action.paramTypes; + final Class[] myParamClass = action.paramClasses; + if (action.paramAddressSourceIndex >= 0) { + params[action.paramAddressSourceIndex] = client.clientSncpAddress; + } + byte[] body = null; + if (myParamTypes.length > 0) { + Writer writer = convert.pollWriter(); + for (int i = 0; i < params.length; i++) { //service方法的参数 + convert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myParamClass[i]) ? CompletionHandler.class : myParamTypes[i], params[i]); + } + body = ((ByteTuple) writer).toArray(); + convert.offerWriter(writer); + } + final SncpClientRequest request = new SncpClientRequest(); + request.prepare(action.header, client.nextSeqno(), traceid, body); + final SocketAddress addr = action.paramAddressTargetIndex >= 0 ? (SocketAddress) params[action.paramAddressTargetIndex] : nextRemoteAddress(); - return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent())); + return client.connect(addr).thenCompose(conn -> client.writeChannel(conn, request).thenApply(rs -> rs.getBodyContent())); } protected SncpClientRequest createSncpClientRequest(final SncpRemoteAction action, final InetSocketAddress clientSncpAddress, final String traceid, final Object[] params) { diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 24ecd0f98..f364d0a38 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -116,7 +116,6 @@ public class SncpTest { //service.updateBean(bean); } catch (Exception e) { e.printStackTrace(); - System.exit(1); } finally { long a = ai.incrementAndGet(); System.out.println("运行了 " + (a == 100 ? "--------------------------------------------------" : "") + a);