diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 38915f118..141dfba3a 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -2506,7 +2506,7 @@ public final class Application { } if (this.clientAsyncGroup != null) { long s = System.currentTimeMillis(); - ((AsyncIOGroup) this.clientAsyncGroup).dispose(); + this.clientAsyncGroup.dispose(); logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); } this.sncpTransportFactory.shutdownNow(); diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 064a4bf77..5d1fbb59d 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -24,6 +24,7 @@ import org.redkale.cluster.ClusterAgent; import org.redkale.mq.MessageAgent; import org.redkale.net.Filter; import org.redkale.net.*; +import org.redkale.net.client.ClientAddress; import org.redkale.net.http.*; import org.redkale.net.sncp.*; import org.redkale.service.*; @@ -64,6 +65,12 @@ public abstract class NodeServer { //当前Server的SNCP协议的组 protected String sncpGroup = null; + //SNCP服务的Client + private SncpClient sncpClient; + + //SncpClient的AsyncGroup + private AsyncIOGroup sncpAsyncGroup; + //SNCP服务的地址, 非SNCP为null private InetSocketAddress sncpAddress; @@ -161,6 +168,10 @@ public abstract class NodeServer { } //必须要进行初始化, 构建Service时需要使用Context中的ExecutorService server.init(this.serverConf); + if (this.sncpAddress != null) { //初始化SncpClient + this.sncpAsyncGroup = new AsyncIOGroup(true, "Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); + this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); + } //init之后才有Executor //废弃 @since 2.3.0 // resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getWorkExecutor()); @@ -860,6 +871,16 @@ public abstract class NodeServer { if (sb != null && sb.length() > 0) { logger.log(Level.INFO, sb.toString()); } + if (this.sncpAsyncGroup != null) { + long s = System.currentTimeMillis(); + this.sncpAsyncGroup.dispose(); + logger.info("SncpAsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); + } + if (this.sncpClient != null) { + long s = System.currentTimeMillis(); + this.sncpClient.close(); + logger.info("SncpClient close in " + (System.currentTimeMillis() - s) + " ms"); + } server.shutdown(); } diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 2fc08e7b3..8bde8bbe4 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -151,8 +151,8 @@ public abstract class Sncp { } public static SncpServiceInfo createSncpServiceInfo(String resourceName, - Class resourceServiceType, T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) { - return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, messageAgent, messageClient); + Class resourceServiceType, T service, Convert convert, SncpClient sncpClient, MessageAgent messageAgent, SncpMessageClient messageClient) { + return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, sncpClient, messageAgent, messageClient); } public static Uint128 actionid(final RpcAction action) { diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index a7c857562..9dfd74b04 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -51,7 +51,6 @@ public class SncpClient extends Client T remote(final SncpServiceInfo info, final int index, final Object... params) { - final String traceid = Traces.currTraceid(); final Convert convert = info.convert; final SncpServiceAction action = info.actions[index]; CompletionHandler callbackHandler = null; @@ -64,7 +63,7 @@ public class SncpClient extends Client future = remote(info, action, traceid, params); + final CompletableFuture future = remote(info, action, convert, Traces.currTraceid(), params); if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler final CompletionHandler handler = callbackHandler; final Object attach = callbackHandlerAttach; @@ -83,15 +82,15 @@ public class SncpClient extends Client v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)); } else { - final CompletableFuture stage = action.returnFutureCreator.create(); + final CompletableFuture returnFuture = action.returnFutureCreator.create(); future.whenComplete((v, t) -> { if (t == null) { - stage.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)); + returnFuture.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)); } else { - stage.completeExceptionally(t); + returnFuture.completeExceptionally(t); } }); - return (T) stage; + return (T) returnFuture; } } else if (action.returnObjectType != null) { //返回类型为JavaBean return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)).join(); @@ -101,10 +100,10 @@ public class SncpClient extends Client remote( + private CompletableFuture remote( final SncpServiceInfo info, final SncpServiceAction action, - final String traceid, + final Convert convert, final Object... params) { return null; diff --git a/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java index fe2c7da87..38685998b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpServiceInfo.java @@ -44,6 +44,9 @@ public final class SncpServiceInfo { protected final Convert convert; + //非MQ模式下此字段才有值 + protected final SncpClient sncpClient; + //MQ模式下此字段才有值 protected final MessageAgent messageAgent; @@ -56,7 +59,9 @@ public final class SncpServiceInfo { //远程模式, 可能为null protected Set remoteAddresses; - SncpServiceInfo(String resourceName, Class resourceServiceType, final T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) { + SncpServiceInfo(String resourceName, Class resourceServiceType, final T service, Convert convert, + SncpClient sncpClient, MessageAgent messageAgent, SncpMessageClient messageClient) { + this.sncpClient = sncpClient; this.name = resourceName; this.serviceType = resourceServiceType; this.serviceid = Sncp.serviceid(resourceName, resourceServiceType); @@ -75,6 +80,11 @@ public final class SncpServiceInfo { this.actions = serviceActions.toArray(new SncpServiceAction[serviceActions.size()]); } + //只给远程模式调用的 + public T remote(final int index, final Object... params) { + return sncpClient.remote(this, index, params); + } + public void updateRemoteAddress(Set remoteGroups, Set remoteAddresses) { this.remoteGroups = remoteGroups; this.remoteAddresses = remoteAddresses;