diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 993049893..a71ad183b 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -292,7 +292,7 @@ public final class Application { throw new RuntimeException(e); } } - this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy); + this.transportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); } diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 0e12629b0..4c994e1bc 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -42,6 +42,8 @@ public final class Transport { supportTcpNoDelay = tcpNoDelay; } + protected final TransportFactory factory; + protected final String name; //即的name属性 protected final String subprotocol; //即的subprotocol属性 @@ -63,18 +65,20 @@ public final class Transport { protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); - public Transport(String name, String subprotocol, final ObjectPool transportBufferPool, + protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses, final TransportStrategy strategy) { - this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy); + this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy); } - public Transport(String name, String protocol, String subprotocol, final ObjectPool transportBufferPool, + protected Transport(String name, String protocol, String subprotocol, + final TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses, final TransportStrategy strategy) { this.name = name; this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.protocol = protocol; + this.factory = factory; this.tcp = "TCP".equalsIgnoreCase(protocol); this.group = transportChannelGroup; this.bufferPool = transportBufferPool; diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index ada2fb1fb..90c476f96 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -46,7 +46,7 @@ public class TransportFactory { //负载均衡策略 protected final TransportStrategy strategy; - public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, + protected TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, final TransportStrategy strategy) { this.executor = executor; this.bufferPool = bufferPool; @@ -54,10 +54,32 @@ public class TransportFactory { this.strategy = strategy; } - public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { + protected TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { this(executor, bufferPool, channelGroup, null); } + public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { + return new TransportFactory(executor, bufferPool, channelGroup, null); + } + + public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, + final TransportStrategy strategy) { + return new TransportFactory(executor, bufferPool, channelGroup, strategy); + } + + public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection addresses) { + return new Transport(name, "TCP", "", this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy); + } + + public Transport createTransport(String name, String protocol, final InetSocketAddress clientAddress, final Collection addresses) { + return new Transport(name, protocol, "", this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy); + } + + public Transport createTransport(String name, String protocol, String subprotocol, + final InetSocketAddress clientAddress, final Collection addresses) { + return new Transport(name, protocol, subprotocol, this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy); + } + public String findGroupName(InetSocketAddress addr) { if (addr == null) return null; return groupAddrs.get(addr); @@ -136,14 +158,14 @@ public class TransportFactory { } if (info == null) return null; if (sncpAddress != null) addresses.remove(sncpAddress); - return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); + return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); } private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { if (groupName == null) return null; TransportGroupInfo info = groupInfos.get(groupName); if (info == null) return null; - return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy); + return new Transport(groupName, info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy); } public ExecutorService getExecutor() { diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index 1eca66bdd..c573c9e8d 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -37,7 +37,7 @@ public class ABMainService implements Service { final int abport = 8888; ResourceFactory resFactory = ResourceFactory.root(); ExecutorService executor = Executors.newSingleThreadExecutor(); - final TransportFactory transFactory = new TransportFactory(executor, newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = TransportFactory.create(executor, newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("g77", new InetSocketAddress("127.0.0.1", 5577)); transFactory.addGroupInfo("g88", new InetSocketAddress("127.0.0.1", 5588)); transFactory.addGroupInfo("g99", new InetSocketAddress("127.0.0.1", 5599)); diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index 6a9cf40e5..0ec4b221d 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -79,7 +79,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); set.add(addr); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); - final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("client", set); final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client"); ResourceFactory.root().inject(service); @@ -156,7 +156,7 @@ public class SncpTest { SncpServer server = new SncpServer(); Set set = new LinkedHashSet<>(); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); - final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); ResourceFactory.root().inject(service); @@ -191,7 +191,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); set.add(new InetSocketAddress(myhost, port)); - final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); server.addSncpServlet(service); diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java index 1b1a827f6..a68d34198 100644 --- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -100,7 +100,7 @@ public class SncpTestServiceImpl implements SncpTestIService { public static void main(String[] args) throws Exception { - final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); + final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070)); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");