sncp优化

This commit is contained in:
redkale
2023-02-08 22:44:00 +08:00
parent 6622e88232
commit 2e40c98b81
5 changed files with 42 additions and 12 deletions

View File

@@ -2506,7 +2506,7 @@ public final class Application {
}
if (this.clientAsyncGroup != null) {
long s = System.currentTimeMillis();
((AsyncIOGroup) this.clientAsyncGroup).dispose();
this.clientAsyncGroup.dispose();
logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
}
this.sncpTransportFactory.shutdownNow();

View File

@@ -24,6 +24,7 @@ import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Filter;
import org.redkale.net.*;
import org.redkale.net.client.ClientAddress;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
@@ -64,6 +65,12 @@ public abstract class NodeServer {
//当前Server的SNCP协议的组
protected String sncpGroup = null;
//SNCP服务的Client
private SncpClient sncpClient;
//SncpClient的AsyncGroup
private AsyncIOGroup sncpAsyncGroup;
//SNCP服务的地址 非SNCP为null
private InetSocketAddress sncpAddress;
@@ -161,6 +168,10 @@ public abstract class NodeServer {
}
//必须要进行初始化, 构建Service时需要使用Context中的ExecutorService
server.init(this.serverConf);
if (this.sncpAddress != null) { //初始化SncpClient
this.sncpAsyncGroup = new AsyncIOGroup(true, "Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true);
this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000);
}
//init之后才有Executor
//废弃 @since 2.3.0
// resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getWorkExecutor());
@@ -860,6 +871,16 @@ public abstract class NodeServer {
if (sb != null && sb.length() > 0) {
logger.log(Level.INFO, sb.toString());
}
if (this.sncpAsyncGroup != null) {
long s = System.currentTimeMillis();
this.sncpAsyncGroup.dispose();
logger.info("SncpAsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
}
if (this.sncpClient != null) {
long s = System.currentTimeMillis();
this.sncpClient.close();
logger.info("SncpClient close in " + (System.currentTimeMillis() - s) + " ms");
}
server.shutdown();
}

View File

@@ -151,8 +151,8 @@ public abstract class Sncp {
}
public static <T extends Service> SncpServiceInfo createSncpServiceInfo(String resourceName,
Class<T> resourceServiceType, T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) {
return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, messageAgent, messageClient);
Class<T> resourceServiceType, T service, Convert convert, SncpClient sncpClient, MessageAgent messageAgent, SncpMessageClient messageClient) {
return new SncpServiceInfo(resourceName, resourceServiceType, service, convert, sncpClient, messageAgent, messageClient);
}
public static Uint128 actionid(final RpcAction action) {

View File

@@ -51,7 +51,6 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
//只给远程模式调用的
public <T> T remote(final SncpServiceInfo info, final int index, final Object... params) {
final String traceid = Traces.currTraceid();
final Convert convert = info.convert;
final SncpServiceAction action = info.actions[index];
CompletionHandler callbackHandler = null;
@@ -64,7 +63,7 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
params[action.paramHandlerAttachIndex] = null;
}
}
final CompletableFuture<byte[]> future = remote(info, action, traceid, params);
final CompletableFuture<byte[]> future = remote(info, action, convert, Traces.currTraceid(), params);
if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler
final CompletionHandler handler = callbackHandler;
final Object attach = callbackHandlerAttach;
@@ -83,15 +82,15 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
if (action.returnFutureClass == CompletableFuture.class) {
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v));
} else {
final CompletableFuture stage = action.returnFutureCreator.create();
final CompletableFuture returnFuture = action.returnFutureCreator.create();
future.whenComplete((v, t) -> {
if (t == null) {
stage.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v));
returnFuture.complete(v == null ? null : convert.convertFrom(action.paramHandlerResultType, v));
} else {
stage.completeExceptionally(t);
returnFuture.completeExceptionally(t);
}
});
return (T) stage;
return (T) returnFuture;
}
} else if (action.returnObjectType != null) { //返回类型为JavaBean
return (T) future.thenApply(v -> v == null ? null : convert.convertFrom(action.paramHandlerResultType, v)).join();
@@ -101,10 +100,10 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
return null;
}
protected CompletableFuture<byte[]> remote(
private CompletableFuture<byte[]> remote(
final SncpServiceInfo info,
final SncpServiceAction action,
final String traceid,
final Convert convert,
final Object... params) {
return null;

View File

@@ -44,6 +44,9 @@ public final class SncpServiceInfo<T extends Service> {
protected final Convert convert;
//非MQ模式下此字段才有值
protected final SncpClient sncpClient;
//MQ模式下此字段才有值
protected final MessageAgent messageAgent;
@@ -56,7 +59,9 @@ public final class SncpServiceInfo<T extends Service> {
//远程模式, 可能为null
protected Set<InetSocketAddress> remoteAddresses;
SncpServiceInfo(String resourceName, Class<T> resourceServiceType, final T service, Convert convert, MessageAgent messageAgent, SncpMessageClient messageClient) {
SncpServiceInfo(String resourceName, Class<T> resourceServiceType, final T service, Convert convert,
SncpClient sncpClient, MessageAgent messageAgent, SncpMessageClient messageClient) {
this.sncpClient = sncpClient;
this.name = resourceName;
this.serviceType = resourceServiceType;
this.serviceid = Sncp.serviceid(resourceName, resourceServiceType);
@@ -75,6 +80,11 @@ public final class SncpServiceInfo<T extends Service> {
this.actions = serviceActions.toArray(new SncpServiceAction[serviceActions.size()]);
}
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
return sncpClient.remote(this, index, params);
}
public void updateRemoteAddress(Set<String> remoteGroups, Set<InetSocketAddress> remoteAddresses) {
this.remoteGroups = remoteGroups;
this.remoteAddresses = remoteAddresses;