From 6c039dc8f434cdc563c946976d0d6a7855a95452 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 29 May 2017 21:01:56 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 10 ++-- src/org/redkale/boot/NodeServer.java | 4 +- src/org/redkale/net/TransportFactory.java | 24 +++++++-- src/org/redkale/net/sncp/Sncp.java | 16 +++--- src/org/redkale/net/sncp/SncpClient.java | 3 +- .../net/sncp/SncpTransportFactory.java | 51 ------------------- .../redkale/test/service/ABMainService.java | 3 +- test/org/redkale/test/sncp/SncpTest.java | 7 +-- .../test/sncp/SncpTestServiceImpl.java | 3 +- 9 files changed, 47 insertions(+), 74 deletions(-) delete mode 100644 src/org/redkale/net/sncp/SncpTransportFactory.java diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 7c1287221..b5b9d1e26 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -103,7 +103,7 @@ public final class Application { final List servers = new CopyOnWriteArrayList<>(); //传输端的TransportFactory - final SncpTransportFactory transportFactory; + final TransportFactory transportFactory; //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.root(); @@ -260,14 +260,14 @@ public final class Application { logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } } - this.transportFactory = new SncpTransportFactory(transportExec, transportPool, transportGroup); + this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup); } public ResourceFactory getResourceFactory() { return resourceFactory; } - public SncpTransportFactory getTransportFactory() { + public TransportFactory getTransportFactory() { return transportFactory; } @@ -358,7 +358,7 @@ public final class Application { Class type = field.getType(); if (type == Application.class) { field.set(src, application); - } else if (type == SncpTransportFactory.class) { + } else if (type == TransportFactory.class) { field.set(src, application.transportFactory); } else if (type == NodeSncpServer.class) { NodeServer server = null; @@ -404,7 +404,7 @@ public final class Application { return false; } - }, Application.class, SncpTransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); + }, Application.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); //-------------------------------------------------------------------------- initResources(); } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 01c0127a5..9c6dfa9e4 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -188,7 +188,7 @@ public abstract class NodeServer { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory appResFactory = application.getResourceFactory(); - final SncpTransportFactory appTranFactory = application.getTransportFactory(); + final TransportFactory appTranFactory = application.getTransportFactory(); final AnyValue resources = application.config.getAnyValue("resources"); final Map cacheResource = new HashMap<>(); //final Map dataResources = new HashMap<>(); @@ -321,7 +321,7 @@ public abstract class NodeServer { final Set> entrys = (Set) serviceFilter.getAllFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; final ResourceFactory appResourceFactory = application.getResourceFactory(); - final SncpTransportFactory appTransportFactory = application.getTransportFactory(); + final TransportFactory appTransportFactory = application.getTransportFactory(); for (FilterEntry entry : entrys) { //service实现类 final Class serviceImplClass = entry.getType(); if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过 diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 3e1e3bacd..e4ec1676a 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -5,12 +5,14 @@ */ package org.redkale.net; +import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.util.*; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.*; import java.util.logging.*; +import org.redkale.service.Service; import org.redkale.util.ObjectPool; /** @@ -36,6 +38,8 @@ public class TransportFactory { //协议地址的Group集合 protected final Map groupInfos = new HashMap<>(); + protected final List> services = new CopyOnWriteArrayList<>(); + public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { this.executor = executor; this.bufferPool = bufferPool; @@ -57,11 +61,11 @@ public class TransportFactory { return this; } - public TransportFactory addGroupInfo(String name, Set addrs) { + public TransportFactory addGroupInfo(String name, Set addrs) { addGroupInfo(new TransportGroupInfo(name, addrs)); return this; } - + public boolean addGroupInfo(TransportGroupInfo info) { if (info == null) throw new RuntimeException("TransportGroupInfo can not null"); if (info.addresses == null) throw new RuntimeException("TransportGroupInfo.addresses can not null"); @@ -124,6 +128,20 @@ public class TransportFactory { return executor; } + public void addSncpService(Service service) { + if (service == null) return; + services.add(new WeakReference<>(service)); + } + + public List getServices() { + List rs = new ArrayList<>(); + for (WeakReference ref : services) { + Service service = ref.get(); + if (service != null) rs.add(service); + } + return rs; + } + public void shutdownNow() { try { this.channelGroup.shutdownNow(); diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 488006619..6e93b09d0 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -15,6 +15,7 @@ import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.Opcodes.*; import jdk.internal.org.objectweb.asm.Type; +import org.redkale.net.TransportFactory; import org.redkale.net.sncp.SncpClient.SncpAction; import org.redkale.service.*; import org.redkale.util.*; @@ -732,7 +733,7 @@ public abstract class Sncp { } public static T createSimpleLocalService(final Class serviceImplClass, - final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { + final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { return createLocalService("", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } @@ -744,7 +745,7 @@ public abstract class Sncp { * @param name 资源名 * @param serviceImplClass Service类 * @param resourceFactory ResourceFactory - * @param transportFactory SncpTransportFactory + * @param transportFactory TransportFactory * @param clientSncpAddress 本地IP地址 * @param groups 所有的组节点,包含自身 * @param conf 启动配置项 @@ -756,7 +757,7 @@ public abstract class Sncp { final String name, final Class serviceImplClass, final ResourceFactory resourceFactory, - final SncpTransportFactory transportFactory, + final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final Set groups, final AnyValue conf) { @@ -795,6 +796,7 @@ public abstract class Sncp { client.setSameGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress)); client.setDiffGroupTransports(transportFactory.loadDiffGroupTransports(clientSncpAddress, diffGroups)); e.set(rs, client); + transportFactory.addSncpService(rs); } catch (NoSuchFieldException ne) { ne.printStackTrace(); } @@ -814,7 +816,7 @@ public abstract class Sncp { } public static T createSimpleRemoteService(final Class serviceImplClass, - final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { + final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { return createRemoteService("", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } @@ -861,7 +863,7 @@ public abstract class Sncp { * @param Service泛型 * @param name 资源名 * @param serviceTypeOrImplClass Service类 - * @param transportFactory SncpTransportFactory + * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 * @param groups 所有的组节点,包含自身 * @param conf 启动配置项 @@ -873,7 +875,7 @@ public abstract class Sncp { public static T createRemoteService( final String name, final Class serviceTypeOrImplClass, - final SncpTransportFactory transportFactory, + final TransportFactory transportFactory, final InetSocketAddress clientAddress, final Set groups, final AnyValue conf) { @@ -898,6 +900,7 @@ public abstract class Sncp { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); c.set(rs, client); + transportFactory.addSncpService(rs); return rs; } catch (Throwable ex) { } @@ -1097,6 +1100,7 @@ public abstract class Sncp { c.setAccessible(true); c.set(rs, conf); } + transportFactory.addSncpService(rs); return rs; } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 54dd4ed95..46376c662 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -81,7 +81,7 @@ public final class SncpClient { //本地模式 protected Transport[] diffGroupTransports; - public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final SncpTransportFactory factory, + public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final TransportFactory factory, final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) { this.remote = remote; this.executor = factory.getExecutor(); @@ -101,7 +101,6 @@ public final class SncpClient { this.actions = methodens.toArray(new SncpAction[methodens.size()]); this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress(); this.addrPort = clientAddress == null ? 0 : clientAddress.getPort(); - factory.addSncpClient(this); } static List getSncpActions(final Class serviceClass) { diff --git a/src/org/redkale/net/sncp/SncpTransportFactory.java b/src/org/redkale/net/sncp/SncpTransportFactory.java deleted file mode 100644 index 1efa55053..000000000 --- a/src/org/redkale/net/sncp/SncpTransportFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.sncp; - -import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; -import java.util.*; -import java.util.concurrent.*; -import org.redkale.net.*; -import org.redkale.util.ObjectPool; - -/** - * - * @author zhangjx - */ -public class SncpTransportFactory extends TransportFactory { - - protected final List> clients = new CopyOnWriteArrayList<>(); - - public SncpTransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { - super(executor, bufferPool, channelGroup); - } - - public SncpTransportFactory addGroupInfo(String name, InetSocketAddress... addrs) { - addGroupInfo(new TransportGroupInfo(name, addrs)); - return this; - } - - public SncpTransportFactory addGroupInfo(String name, Set addrs) { - addGroupInfo(new TransportGroupInfo(name, addrs)); - return this; - } - - void addSncpClient(SncpClient client) { - clients.add(new WeakReference<>(client)); - } - - public List getSncpClients() { - List rs = new ArrayList<>(); - for (WeakReference ref : clients) { - SncpClient client = ref.get(); - if (client != null) rs.add(client); - } - return rs; - } -} diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index 7a832fc84..8e21995a7 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -15,6 +15,7 @@ import java.util.logging.*; import javax.annotation.Resource; import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.json.JsonConvert; +import org.redkale.net.TransportFactory; import org.redkale.net.http.*; import org.redkale.net.sncp.*; import org.redkale.service.Service; @@ -36,7 +37,7 @@ public class ABMainService implements Service { final int abport = 8888; ResourceFactory resFactory = ResourceFactory.root(); ExecutorService executor = Executors.newSingleThreadExecutor(); - final SncpTransportFactory transFactory = new SncpTransportFactory(executor, newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = new TransportFactory(executor, newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("g77", new InetSocketAddress("127.0.0.1", 5577)); transFactory.addGroupInfo("g88", new InetSocketAddress("127.0.0.1", 5588)); transFactory.addGroupInfo("g99", new InetSocketAddress("127.0.0.1", 5599)); diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index fc8441d12..6a9cf40e5 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -14,6 +14,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.LogManager; import org.redkale.convert.bson.*; +import org.redkale.net.TransportFactory; import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.*; @@ -78,7 +79,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); set.add(addr); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); - final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("client", set); final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client"); ResourceFactory.root().inject(service); @@ -155,7 +156,7 @@ public class SncpTest { SncpServer server = new SncpServer(); Set set = new LinkedHashSet<>(); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); - final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); ResourceFactory.root().inject(service); @@ -190,7 +191,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); set.add(new InetSocketAddress(myhost, port)); - final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); server.addSncpServlet(service); diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java index 64f14a06d..1b1a827f6 100644 --- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -8,6 +8,7 @@ package org.redkale.test.sncp; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.concurrent.*; +import org.redkale.net.TransportFactory; import org.redkale.net.sncp.*; import org.redkale.service.*; import org.redkale.source.DataCallArrayAttribute; @@ -99,7 +100,7 @@ public class SncpTestServiceImpl implements SncpTestIService { public static void main(String[] args) throws Exception { - final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070)); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");