diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index f3240a332..72a7db391 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -73,8 +73,6 @@ public final class Application { final Map globalGroupProtocols = new HashMap<>(); - final Map transports = new HashMap<>(); - final InetAddress localAddress; final List cacheSources = new CopyOnWriteArrayList<>(); diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index b62278b4a..2f2fcdbf1 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -75,7 +75,7 @@ public final class NodeHttpServer extends NodeServer { synchronized (regFactory) { Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(resourceName, getExecutor(), WebSocketNodeService.class, (InetSocketAddress) null, (Transport) null, (Collection) null); + nodeService = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), WebSocketNodeService.class, (InetSocketAddress) null, (Transport) null, (Collection) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); factory.inject(nodeService, self); logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index a682778de..7777d8c84 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -161,14 +161,14 @@ public abstract class NodeServer { private void initResource() { final NodeServer self = this; //--------------------------------------------------------------------------------------------- - final ResourceFactory regFactory = application.getResourceFactory(); + final ResourceFactory appResFactory = application.getResourceFactory(); factory.add(DataSource.class, (ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null) return; if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource DataSource source = new DataDefaultSource(resourceName); application.dataSources.add(source); - regFactory.register(resourceName, DataSource.class, source); + appResFactory.register(resourceName, DataSource.class, source); SncpClient client = null; Transport sameGroupTransport = null; @@ -190,8 +190,8 @@ public abstract class NodeServer { } final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); if ((src instanceof DataSource) && sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener - Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports); - regFactory.register(resourceName, DataCacheListener.class, cacheListenerService); + Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports); + appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService); final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpServer.getSncpGroup(), gs, null); @@ -231,14 +231,14 @@ public abstract class NodeServer { throw new RuntimeException(src.getClass().getName() + " not found _sameGroupTransport or _diffGroupTransports at " + field, e); } final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); - final CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports); + final CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; source.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class); if (field.getAnnotation(Transient.class) != null) source.setNeedStore(false); //必须在setStoreType之后 application.cacheSources.add(source); - regFactory.register(resourceName, CacheSource.class, source); + appResFactory.register(resourceName, CacheSource.class, source); field.set(src, source); rf.inject(source, self); // ((Service) source).init(null); @@ -280,7 +280,7 @@ public abstract class NodeServer { Service service; if (localed) { //本地模式 - service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups)); + service = Sncp.createLocalService(entry.getName(), getExecutor(), application.getResourceFactory(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups)); } else { service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups)); } @@ -355,7 +355,7 @@ public abstract class NodeServer { flag = true; } final String groupid = sb.toString(); - Transport transport = application.transports.get(groupid); + Transport transport = application.resourceFactory.find(groupid, Transport.class); if (transport != null) return transport; final List transports = new ArrayList<>(); for (String group : groups) { @@ -370,11 +370,11 @@ public abstract class NodeServer { Transport first = transports.get(0); Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); - synchronized (application.transports) { - transport = application.transports.get(groupid); + synchronized (application.resourceFactory) { + transport = application.resourceFactory.find(groupid, Transport.class); if (transport == null) { transport = newTransport; - application.transports.put(groupid, transport); + application.resourceFactory.register(groupid, transport); } } return transport; @@ -383,8 +383,8 @@ public abstract class NodeServer { protected Transport loadTransport(final String group) { if (group == null) return null; Transport transport; - synchronized (application.transports) { - transport = application.transports.get(group); + synchronized (application.resourceFactory) { + transport = application.resourceFactory.find(group, Transport.class); if (transport != null) { if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) { throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress()); @@ -395,7 +395,7 @@ public abstract class NodeServer { if (addrs == null) throw new RuntimeException("Not found = " + group + " on "); transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); - application.transports.put(group, transport); + application.resourceFactory.register(group, transport); } return transport; } diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 28da98f92..50b1a9135 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -83,7 +83,7 @@ public final class Transport { if (first == null) first = t; tmpgroup.add(t.name); } - Collections.sort(tmpgroup); //按字母排列顺序 + Collections.sort(tmpgroup); //必须按字母排列顺序确保,相同内容的transport列表组合的name相同,而不会因为list的顺序不同产生不同的name boolean flag = false; StringBuilder sb = new StringBuilder(); for (String g : tmpgroup) { diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index a8dcdc5f4..dae63ded6 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -673,12 +673,12 @@ public abstract class Sncp { * @param executor 线程池 * @param serviceClass Service类 * @param clientAddress 本地IP地址 - * @param sameGroupTransport 同组的通信组件 + * @param sameGroupTransport 同组的通信组件 * @param diffGroupTransports 异组的通信组件列表 * @return Service的本地模式实例 */ @SuppressWarnings("unchecked") - public static T createLocalService(final String name, final Consumer executor, + public static T createLocalService(final String name, final Consumer executor, final ResourceFactory resourceFactory, final Class serviceClass, final InetSocketAddress clientAddress, final Transport sameGroupTransport, final Collection diffGroupTransports) { try { final Class newClazz = createLocalServiceClass(name, serviceClass); @@ -699,7 +699,18 @@ public abstract class Sncp { List list = new ArrayList<>(); if (sameGroupTransport != null) list.add(sameGroupTransport); if (diffGroupTransports != null) list.addAll(diffGroupTransports); - if (!list.isEmpty()) remoteTransport = new Transport(list); + if (!list.isEmpty()) { + Transport tmp = new Transport(list); + synchronized (resourceFactory) { + Transport old = resourceFactory.find(tmp.getName(), Transport.class); + if (old != null) { + remoteTransport = old; + } else { + remoteTransport = tmp; + resourceFactory.register(tmp.getName(), tmp); + } + } + } } if (remoteService == null && remoteTransport != null) { remoteService = createRemoteService(name, executor, serviceClass, clientAddress, remoteTransport); @@ -825,7 +836,7 @@ public abstract class Sncp { * @param serviceClass Service类 * @param clientAddress 本地IP地址 * @param transport 通信组件 - * + * * @return Service的远程模式实例 */ @SuppressWarnings("unchecked")