改造Transport的构造函数,便于TransportFactory统一管理

This commit is contained in:
Redkale
2017-11-02 19:04:55 +08:00
parent e559379294
commit ad2a3f0d54
6 changed files with 39 additions and 13 deletions

View File

@@ -292,7 +292,7 @@ public final class Application {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy); this.transportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
Thread.currentThread().setContextClassLoader(this.classLoader); Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
} }

View File

@@ -42,6 +42,8 @@ public final class Transport {
supportTcpNoDelay = tcpNoDelay; supportTcpNoDelay = tcpNoDelay;
} }
protected final TransportFactory factory;
protected final String name; //即<group>的name属性 protected final String name; //即<group>的name属性
protected final String subprotocol; //即<group>的subprotocol属性 protected final String subprotocol; //即<group>的subprotocol属性
@@ -63,18 +65,20 @@ public final class Transport {
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool, protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) { final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy); this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy);
} }
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool, protected Transport(String name, String protocol, String subprotocol,
final TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) { final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this.name = name; this.name = name;
this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
this.protocol = protocol; this.protocol = protocol;
this.factory = factory;
this.tcp = "TCP".equalsIgnoreCase(protocol); this.tcp = "TCP".equalsIgnoreCase(protocol);
this.group = transportChannelGroup; this.group = transportChannelGroup;
this.bufferPool = transportBufferPool; this.bufferPool = transportBufferPool;

View File

@@ -46,7 +46,7 @@ public class TransportFactory {
//负载均衡策略 //负载均衡策略
protected final TransportStrategy strategy; protected final TransportStrategy strategy;
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup, protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) { final TransportStrategy strategy) {
this.executor = executor; this.executor = executor;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
@@ -54,10 +54,32 @@ public class TransportFactory {
this.strategy = strategy; this.strategy = strategy;
} }
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) { protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
this(executor, bufferPool, channelGroup, null); this(executor, bufferPool, channelGroup, null);
} }
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
return new TransportFactory(executor, bufferPool, channelGroup, null);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
return new TransportFactory(executor, bufferPool, channelGroup, strategy);
}
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, "TCP", "", this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy);
}
public Transport createTransport(String name, String protocol, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, protocol, "", this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy);
}
public Transport createTransport(String name, String protocol, String subprotocol,
final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, protocol, subprotocol, this, this.bufferPool, this.channelGroup, clientAddress, addresses, strategy);
}
public String findGroupName(InetSocketAddress addr) { public String findGroupName(InetSocketAddress addr) {
if (addr == null) return null; if (addr == null) return null;
return groupAddrs.get(addr); return groupAddrs.get(addr);
@@ -136,14 +158,14 @@ public class TransportFactory {
} }
if (info == null) return null; if (info == null) return null;
if (sncpAddress != null) addresses.remove(sncpAddress); if (sncpAddress != null) addresses.remove(sncpAddress);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
} }
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
if (groupName == null) return null; if (groupName == null) return null;
TransportGroupInfo info = groupInfos.get(groupName); TransportGroupInfo info = groupInfos.get(groupName);
if (info == null) return null; if (info == null) return null;
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy); return new Transport(groupName, info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy);
} }
public ExecutorService getExecutor() { public ExecutorService getExecutor() {

View File

@@ -37,7 +37,7 @@ public class ABMainService implements Service {
final int abport = 8888; final int abport = 8888;
ResourceFactory resFactory = ResourceFactory.root(); ResourceFactory resFactory = ResourceFactory.root();
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
final TransportFactory transFactory = new TransportFactory(executor, newBufferPool(), newChannelGroup()); final TransportFactory transFactory = TransportFactory.create(executor, newBufferPool(), newChannelGroup());
transFactory.addGroupInfo("g77", new InetSocketAddress("127.0.0.1", 5577)); transFactory.addGroupInfo("g77", new InetSocketAddress("127.0.0.1", 5577));
transFactory.addGroupInfo("g88", new InetSocketAddress("127.0.0.1", 5588)); transFactory.addGroupInfo("g88", new InetSocketAddress("127.0.0.1", 5588));
transFactory.addGroupInfo("g99", new InetSocketAddress("127.0.0.1", 5599)); transFactory.addGroupInfo("g99", new InetSocketAddress("127.0.0.1", 5599));

View File

@@ -79,7 +79,7 @@ public class SncpTest {
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(addr); set.add(addr);
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
transFactory.addGroupInfo("client", set); transFactory.addGroupInfo("client", set);
final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client"); final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client");
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
@@ -156,7 +156,7 @@ public class SncpTest {
SncpServer server = new SncpServer(); SncpServer server = new SncpServer();
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
transFactory.addGroupInfo("server", set); transFactory.addGroupInfo("server", set);
SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server");
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
@@ -191,7 +191,7 @@ public class SncpTest {
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(new InetSocketAddress(myhost, port)); set.add(new InetSocketAddress(myhost, port));
final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
transFactory.addGroupInfo("server", set); transFactory.addGroupInfo("server", set);
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server");
server.addSncpServlet(service); server.addSncpServlet(service);

View File

@@ -100,7 +100,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final TransportFactory transFactory = new TransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070)); transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070));
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");