diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 0a0d10ee0..d71c8791b 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -94,14 +94,23 @@ public class NodeHttpServer extends NodeServer { try { if (field.getAnnotation(Resource.class) == null) return; if (!(src instanceof WebSocketServlet)) return; + ResourceFactory.ResourceLoader loader = null; + ResourceFactory sncpResFactory = null; + for (NodeServer ns : application.servers) { + if (!ns.isSNCP()) continue; + sncpResFactory = ns.resourceFactory; + loader = sncpResFactory.findLoader(WebSocketNode.class, field); + if (loader != null) break; + } + if (loader != null) loader.load(sncpResFactory, src, resourceName, field, attachment); synchronized (regFactory) { Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { nodeService = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), WebSocketNodeService.class, (InetSocketAddress) null, (String) null, (Set) null, (AnyValue) null, (Transport) null, (Collection) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); - resourceFactory.inject(nodeService, self); - logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService); } + resourceFactory.inject(nodeService, self); + logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService); field.set(src, nodeService); } } catch (Exception e) { diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index dba8b1f71..a09483d8f 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -21,6 +21,7 @@ import static org.redkale.boot.Application.*; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.net.Filter; import org.redkale.net.*; +import org.redkale.net.http.WebSocketServlet; import org.redkale.net.sncp.*; import org.redkale.service.*; import org.redkale.source.*; @@ -267,44 +268,51 @@ public abstract class NodeServer { }, DataSource.class); //------------------------------------- 注册CacheSource -------------------------------------------------------- - resourceFactory.register((ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { - try { - if (field.getAnnotation(Resource.class) == null) return; - if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource - final Service srcService = (Service) src; - SncpClient client = Sncp.getSncpClient(srcService); - Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService); - Transport[] dts = Sncp.getDiffGroupTransports((Service) src); - List diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts); - final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); - final AnyValue sourceConf = cacheResource.get(resourceName); - final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type")); - @SuppressWarnings("unchecked") - final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class) sourceType, sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports); - Type genericType = field.getGenericType(); - ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; - Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; - if (sourceType == CacheMemorySource.class) { - CacheMemorySource memorySource = (CacheMemorySource) source; - memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class); - if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后 - } - application.cacheSources.add(source); - appResFactory.register(resourceName, genericType, source); - appResFactory.register(resourceName, CacheSource.class, source); - field.set(src, source); - rf.inject(source, self); // - if (source instanceof Service) ((Service) source).init(sourceConf); + resourceFactory.register(new ResourceFactory.ResourceLoader() { + public void load(ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) { + try { + if (field.getAnnotation(Resource.class) == null) return; + if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource + final Service srcService = (Service) src; + SncpClient client = Sncp.getSncpClient(srcService); + Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService); + Transport[] dts = Sncp.getDiffGroupTransports((Service) src); + List diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts); + final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); + final AnyValue sourceConf = cacheResource.get(resourceName); + final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type")); + @SuppressWarnings("unchecked") + final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class) sourceType, + sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports); + Type genericType = field.getGenericType(); + ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; + Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; + if (sourceType == CacheMemorySource.class) { + CacheMemorySource memorySource = (CacheMemorySource) source; + memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class); + if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后 + } + application.cacheSources.add(source); + appResFactory.register(resourceName, genericType, source); + appResFactory.register(resourceName, CacheSource.class, source); + field.set(src, source); + rf.inject(source, self); // + if (source instanceof Service) ((Service) source).init(sourceConf); - if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource - NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); - Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); - sncpServer.getSncpServer().addSncpServlet((Service) source); - logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source); + if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource + NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); + Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); + sncpServer.getSncpServer().addSncpServlet((Service) source); + //logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source); + } + logger.info("[" + Thread.currentThread().getName() + "] Load Source " + source); + } catch (Exception e) { + logger.log(Level.SEVERE, "DataSource inject error", e); } - logger.info("[" + Thread.currentThread().getName() + "] Load Source " + source); - } catch (Exception e) { - logger.log(Level.SEVERE, "DataSource inject error", e); + } + + public boolean autoNone() { + return false; } }, CacheSource.class); } @@ -341,28 +349,29 @@ public abstract class NodeServer { || (this.sncpGroup == null && entry.isEmptyGroups()) //空的SNCP配置 || serviceImplClass.getAnnotation(Local.class) != null;//本地模式 if (localed && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类 - final BiConsumer runner = (ResourceFactory rf, Boolean needinject) -> { + final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { try { Service service; - if (localed) { //本地模式 - service = Sncp.createLocalService(entry.getName(), getExecutor(), application.getResourceFactory(), serviceImplClass, + boolean ws = src instanceof WebSocketServlet; + if (ws || localed) { //本地模式 + service = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), serviceImplClass, NodeServer.this.sncpAddress, NodeServer.this.sncpGroup, groups, entry.getProperty(), loadTransport(NodeServer.this.sncpGroup), loadTransports(groups)); } else { - service = Sncp.createRemoteService(entry.getName(), getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups)); + service = Sncp.createRemoteService(resourceName, getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups)); } if (SncpClient.parseMethod(serviceImplClass).isEmpty()) return; //class没有可用的方法, 通常为BaseService - //final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, entry.getName(), localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty()); + //final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, resourceName, localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty()); for (final Class restype : Sncp.getResourceTypes(service)) { - if (resourceFactory.find(entry.getName(), restype) == null) { - regFactory.register(entry.getName(), restype, service); - if (needinject) rf.inject(service); //动态加载的Service也存在按需加载的注入资源 + if (rf.find(resourceName, restype) == null) { + regFactory.register(resourceName, restype, service); } else if (isSNCP() && !entry.isAutoload()) { - throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat."); + throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat."); } } if (Sncp.isRemote(service)) { remoteServices.add(service); } else { + if (field != null) rf.inject(service); //动态加载的Service也存在按需加载的注入资源 localServices.add(service); interceptorServices.add(service); if (consumer != null) consumer.accept(service); @@ -374,16 +383,13 @@ public abstract class NodeServer { } }; if (entry.isExpect()) { - ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { - runner.accept(rf, true); - }; ResourceType rty = entry.getType().getAnnotation(ResourceType.class); Class[] resTypes = rty == null ? new Class[]{} : rty.value(); for (final Class restype : resTypes) { resourceFactory.register(resourceLoader, restype); } } else { - runner.accept(resourceFactory, false); + resourceLoader.load(resourceFactory, null, entry.getName(), null, false); } } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 0d54ca9b5..cf43268a8 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -233,7 +233,6 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.broadcastMessage(recent, message, last); } - CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(recent, message, last); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes"); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { @@ -241,6 +240,7 @@ public abstract class WebSocketNode { if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; future = future == null ? remoteNode.broadcastMessage(addr, recent, message, last) : future.thenCombine(remoteNode.broadcastMessage(addr, recent, message, last), (a, b) -> a | b); }