From 93d7277bfe64370d306674a6a99d9161ad2f3fdf Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 15 Jan 2024 22:47:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=9D=9E=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E7=B1=BB=E7=9A=84Service=E5=8A=A0=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/boot/NodeInterceptor.java | 2 +- .../java/org/redkale/boot/NodeServer.java | 71 +++++++++++-------- .../org/redkale/cluster/ClusterRpcClient.java | 2 + src/main/java/org/redkale/net/sncp/Sncp.java | 8 +++ .../org/redkale/net/sncp/SncpRpcGroups.java | 2 +- 5 files changed, 54 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/redkale/boot/NodeInterceptor.java b/src/main/java/org/redkale/boot/NodeInterceptor.java index f7fc34ab8..895f9039c 100644 --- a/src/main/java/org/redkale/boot/NodeInterceptor.java +++ b/src/main/java/org/redkale/boot/NodeInterceptor.java @@ -27,7 +27,7 @@ public class NodeInterceptor { /** * Server.shutdown之前调用
- * NodeServer.shutdown的部署是先执行NodeInterceptor.preShutdown,再执行 Server.sshutdown 方法 + * NodeServer.shutdown的部署是先执行NodeInterceptor.preShutdown,再执行 Server.shutdown 方法 * * @param server NodeServer */ diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 2f6a79860..e3161ac32 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -139,10 +139,15 @@ public abstract class NodeServer { this.serverConf = config == null ? AnyValue.create() : config; if (isSNCP()) { // SNCP协议 String host = this.serverConf.getValue("host", isWATCH() ? "127.0.0.1" : "0.0.0.0").replace("0.0.0.0", ""); - this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getAddress().getHostAddress() : host, this.serverConf.getIntValue("port")); + if (host.isEmpty()) { + host = application.localAddress.getAddress().getHostAddress(); + } + this.sncpAddress = new InetSocketAddress(host, this.serverConf.getIntValue("port")); this.sncpGroup = application.getSncpRpcGroups().getGroup(this.sncpAddress); //单向SNCP服务不需要对等group - //if (this.sncpGroup == null) throw new RedkaleException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found info"); + //if (this.sncpGroup == null) { + // throw new RedkaleException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found info"); + //} } //单点服务不会有 sncpAddress、sncpGroup if (this.sncpAddress != null) { @@ -173,7 +178,8 @@ public abstract class NodeServer { //必须要进行初始化, 构建Service时需要使用Context中的ExecutorService server.init(this.serverConf); if (this.sncpAddress != null) { //初始化SncpClient - this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); + this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), + server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, application.getNodeid(), this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); } @@ -244,7 +250,7 @@ public abstract class NodeServer { private void registerResTypeLoader() { //--------------------- 注册 Local AutoLoad(false) Service --------------------- - resourceFactory.register(this::loadService, Service.class); + resourceFactory.register(this::loadResourceService, Service.class); //----------------------------- 注册 WebSocketNode ----------------------------- final NodeServer self = this; final ResourceFactory appResFactory = application.getResourceFactory(); @@ -292,51 +298,58 @@ public abstract class NodeServer { }, WebSocketNode.class); } - private Object loadService(ResourceFactory rf, String srcResourceName, Object srcObj, String resourceName, Field field, Object attachment) { + //Service.class的ResourceTypeLoader + private Object loadResourceService(ResourceFactory rf, String srcResourceName, Object srcObj, String resourceName, Field field, Object attachment) { final NodeServer self = this; final ResourceFactory appResFactory = application.getResourceFactory(); - Class resServiceType = Service.class; + Class serviceImplClass = Service.class; try { - resServiceType = (Class) field.getType(); - if (resServiceType.getAnnotation(Local.class) == null) { + serviceImplClass = (Class) field.getType(); + if (serviceImplClass.getAnnotation(Local.class) == null) { return null; } if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) { return null; //远程模式不得注入 AutoLoad Service } boolean auto = true; - AutoLoad al = resServiceType.getAnnotation(AutoLoad.class); + AutoLoad al = serviceImplClass.getAnnotation(AutoLoad.class); if (al != null) { auto = al.value(); } - org.redkale.util.AutoLoad al2 = resServiceType.getAnnotation(org.redkale.util.AutoLoad.class); + org.redkale.util.AutoLoad al2 = serviceImplClass.getAnnotation(org.redkale.util.AutoLoad.class); if (al2 != null) { auto = al2.value(); } - if (auto) { + if (auto && !Utility.isAbstractOrInterface(serviceImplClass)) { return null; } //ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory); Service service; - if (Modifier.isFinal(resServiceType.getModifiers()) || Sncp.isComponent(resServiceType)) { - service = (Service) resServiceType.getConstructor().newInstance(); + if (Modifier.isFinal(serviceImplClass.getModifiers()) || Sncp.isComponent(serviceImplClass)) { + service = (Service) serviceImplClass.getConstructor().newInstance(); + } else if (Utility.isAbstractOrInterface(serviceImplClass)) { //没有具体实现类 + AsmMethodBoost methodBoost = application.createAsmMethodBoost(true, serviceImplClass); + MessageAgent mqAgent = appResFactory.find("", MessageAgent.class); + service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, + methodBoost, appResFactory, application.getSncpRpcGroups(), this.sncpClient, mqAgent, null, null); } else { - AsmMethodBoost methodBoost = application.createAsmMethodBoost(false, resServiceType); - service = Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, + AsmMethodBoost methodBoost = application.createAsmMethodBoost(false, serviceImplClass); + service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, methodBoost, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null); } - appResFactory.register(resourceName, resServiceType, service); + appResFactory.register(resourceName, serviceImplClass, service); field.set(srcObj, service); rf.inject(resourceName, service, self); // 给其可能包含@Resource的字段赋值; - if (!application.isCompileMode()) { + if (!application.isCompileMode() && !Sncp.isRemote(service)) { service.init(null); } - logger.info("Load Service(@Local @AutoLoad service = " + resServiceType.getSimpleName() + ", resourceName = '" + resourceName + "')"); + logger.info("Load Service(" + (Sncp.isRemote(service) ? "Remote" : "@Local") + + " @AutoLoad service = " + serviceImplClass.getSimpleName() + ", resourceName = '" + resourceName + "')"); return service; } catch (Exception e) { - logger.log(Level.SEVERE, "Load @Local @AutoLoad(false) Service inject " + resServiceType + " to " + srcObj + " error", e); + logger.log(Level.SEVERE, "Load @AutoLoad(false) Service inject " + serviceImplClass + " to " + srcObj + " error", e); return null; } } @@ -406,34 +419,34 @@ public abstract class NodeServer { return null; } RedkaleClassLoader.putReflectionPublicMethods(serviceImplClass.getName()); - MessageAgent agent = getMessageAgent(entry.getProperty()); + MessageAgent mqAgent = getMessageAgent(entry.getProperty()); Service service; - final boolean ws = srcObj instanceof WebSocketServlet; if (Sncp.isComponent(serviceImplClass)) { //Component RedkaleClassLoader.putReflectionPublicConstructors(serviceImplClass, serviceImplClass.getName()); if (!acceptsComponent(serviceImplClass)) { return null; } service = serviceImplClass.getDeclaredConstructor().newInstance(); - } else if (ws || localMode) { //本地模式 + } else if (srcObj instanceof WebSocketServlet || localMode) { //本地模式 AsmMethodBoost methodBoost = application.createAsmMethodBoost(false, serviceImplClass); service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, - methodBoost, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty()); + methodBoost, appResourceFactory, rpcGroups, this.sncpClient, mqAgent, group, entry.getProperty()); } else { AsmMethodBoost methodBoost = application.createAsmMethodBoost(true, serviceImplClass); service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, - methodBoost, appResourceFactory, rpcGroups, this.sncpClient, agent, group, entry.getProperty()); + methodBoost, appResourceFactory, rpcGroups, this.sncpClient, mqAgent, group, entry.getProperty()); } final Class restype = Sncp.getResourceType(service); if (rf.find(resourceName, restype) == null) { regFactory.register(resourceName, restype, service); } else if (isSNCP() && !entry.isAutoload()) { - throw new RedkaleException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + group + ") is repeat."); + throw new RedkaleException(restype.getSimpleName() + + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + group + ") is repeat."); } if (Sncp.isRemote(service)) { remoteServices.add(service); - if (agent != null) { - sncpRemoteAgents.put(agent.getName(), agent); + if (mqAgent != null) { + sncpRemoteAgents.put(mqAgent.getName(), mqAgent); } } else { if (field != null) { @@ -487,8 +500,8 @@ public abstract class NodeServer { } if (isSNCP() && !sncpRemoteAgents.isEmpty()) { sncpRemoteAgents.values().forEach(agent -> { - // agent.putSncpResp((NodeSncpServer) this); - // agent.startSncpRespConsumer(); + // mqAgent.putSncpResp((NodeSncpServer) this); + // mqAgent.startSncpRespConsumer(); }); } //----------------- init ----------------- diff --git a/src/main/java/org/redkale/cluster/ClusterRpcClient.java b/src/main/java/org/redkale/cluster/ClusterRpcClient.java index 6709c7fd7..a95a82063 100644 --- a/src/main/java/org/redkale/cluster/ClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/ClusterRpcClient.java @@ -12,6 +12,8 @@ import java.util.concurrent.CompletableFuture; * 详情见: https://redkale.org * * @author zhangjx + * @param message + * @param

result * * @since 2.8.0 */ diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 9aa12f50f..2118fd6fd 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -159,6 +159,14 @@ public abstract class Sncp { return new SncpRemoteInfo(resourceName, resourceServiceType, serviceImplClass, convert, sncpRpcGroups, sncpClient, messageAgent, remoteGroup); } + /** + * 格式:资源类型:资源名 + * + * @param resourceName 资源名 + * @param resourceType 资源类型 + * + * @return resourceid + */ public static String resourceid(String resourceName, Class resourceType) { return resourceType.getName() + ':' + (resourceName == null ? "" : resourceName); } diff --git a/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java b/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java index 94fdf00cc..2ff459fdd 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java +++ b/src/main/java/org/redkale/net/sncp/SncpRpcGroups.java @@ -21,7 +21,7 @@ public class SncpRpcGroups { protected final ConcurrentHashMap sncpRpcGroups = new ConcurrentHashMap<>(); - //key: resourceid + //key: resourceid(serviceType:resourceName) protected final ConcurrentHashMap> sncpClusters = new ConcurrentHashMap<>(); public SncpRpcGroup getSncpRpcGroup(String group) {