From fa9e62d2a72170d82cbd2871af0e2f3304d0ddb9 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Fri, 1 Jun 2018 10:00:59 +0800 Subject: [PATCH] --- src/org/redkale/net/Transport.java | 136 +++++++++++++++-------------- 1 file changed, 70 insertions(+), 66 deletions(-) diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 9ec73302f..f682e1f9a 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -29,41 +29,41 @@ import org.redkale.util.*; * @author zhangjx */ public final class Transport { - + public static final String DEFAULT_PROTOCOL = "TCP"; - + protected final AtomicInteger seq = new AtomicInteger(-1); - + protected final TransportFactory factory; - + protected final String name; //即的name属性 protected final String subprotocol; //即的subprotocol属性 protected final boolean tcp; - + protected final String protocol; - + protected final AsynchronousChannelGroup group; - + protected final InetSocketAddress clientAddress; //不可能为null protected TransportNode[] transportNodes = new TransportNode[0]; - + protected final ObjectPool bufferPool; - + protected final SSLContext sslContext; //负载均衡策略 protected final TransportStrategy strategy; - + protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress, final Collection addresses, final TransportStrategy strategy) { this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy); } - + protected Transport(String name, String protocol, String subprotocol, final TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress, @@ -81,7 +81,7 @@ public final class Transport { this.strategy = strategy; updateRemoteAddresses(addresses); } - + public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { final TransportNode[] oldNodes = this.transportNodes; synchronized (this) { @@ -109,7 +109,7 @@ public final class Transport { } return rs; } - + public final boolean addRemoteAddresses(final InetSocketAddress addr) { if (addr == null) return false; if (clientAddress != null && clientAddress.equals(addr)) return false; @@ -125,7 +125,7 @@ public final class Transport { return true; } } - + public final boolean removeRemoteAddresses(InetSocketAddress addr) { if (addr == null) return false; synchronized (this) { @@ -133,15 +133,15 @@ public final class Transport { } return true; } - + public String getName() { return name; } - + public String getSubprotocol() { return subprotocol; } - + public void close() { TransportNode[] nodes = this.transportNodes; if (nodes == null) return; @@ -149,22 +149,22 @@ public final class Transport { if (node != null) node.dispose(); } } - + public InetSocketAddress getClientAddress() { return clientAddress; } - + public TransportNode[] getTransportNodes() { return transportNodes; } - + public TransportNode findTransportNode(SocketAddress addr) { for (TransportNode node : this.transportNodes) { if (node.address.equals(addr)) return node; } return null; } - + public InetSocketAddress[] getRemoteAddresses() { InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length]; for (int i = 0; i < rs.length; i++) { @@ -172,36 +172,36 @@ public final class Transport { } return rs; } - + @Override public String toString() { return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}"; } - + public ByteBuffer pollBuffer() { return bufferPool.get(); } - + public Supplier getBufferSupplier() { return bufferPool; } - + public void offerBuffer(ByteBuffer buffer) { bufferPool.accept(buffer); } - + public void offerBuffer(ByteBuffer... buffers) { for (ByteBuffer buffer : buffers) offerBuffer(buffer); } - + public AsynchronousChannelGroup getTransportChannelGroup() { return group; } - + public boolean isTCP() { return tcp; } - + public CompletableFuture pollConnection(SocketAddress addr0) { if (this.strategy != null) return strategy.pollConnection(addr0, this); final TransportNode[] nodes = this.transportNodes; @@ -273,7 +273,7 @@ public final class Transport { future.complete(asyncConn); } } - + @Override public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; @@ -289,7 +289,7 @@ public final class Transport { future.complete(r); } }); - + } catch (Exception e) { future.completeExceptionally(e); } @@ -302,7 +302,7 @@ public final class Transport { throw new RuntimeException("transport address = " + addr, ex); } } - + private CompletableFuture pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException { //从可用/不可用的地址列表中创建连接 AtomicInteger count = new AtomicInteger(nodes.length); @@ -317,15 +317,19 @@ public final class Transport { channel.connect(node.address, node, new CompletionHandler() { @Override public void completed(Void result, TransportNode attachment) { - attachment.disabletime = 0; - AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); - if (future.isDone()) { - if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); - } else { - future.complete(asyncConn); + try { + attachment.disabletime = 0; + AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + if (future.isDone()) { + if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); + } else { + future.complete(asyncConn); + } + } catch (Exception e) { + failed(e, attachment); } } - + @Override public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; @@ -341,7 +345,7 @@ public final class Transport { } return future; } - + public void offerConnection(final boolean forceClose, AsyncConnection conn) { if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return; if (!forceClose && conn.isTCP()) { @@ -355,7 +359,7 @@ public final class Transport { conn.dispose(); } } - + public void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler handler) { pollConnection(addr).whenComplete((conn, ex) -> { if (ex != null) { @@ -363,28 +367,28 @@ public final class Transport { return; } conn.write(buffer, buffer, new CompletionHandler() { - + @Override public void completed(Integer result, ByteBuffer attachment) { buffer.clear(); conn.read(buffer, buffer, new CompletionHandler() { - + @Override public void completed(Integer result, ByteBuffer attachment) { if (handler != null) handler.completed(result, att); offerBuffer(buffer); offerConnection(false, conn); } - + @Override public void failed(Throwable exc, ByteBuffer attachment) { offerBuffer(buffer); offerConnection(true, conn); } }); - + } - + @Override public void failed(Throwable exc, ByteBuffer attachment) { offerBuffer(buffer); @@ -393,88 +397,88 @@ public final class Transport { }); }); } - + public static class TransportNode { - + protected InetSocketAddress address; - + protected volatile long disabletime; //不可用时的时间, 为0表示可用 protected final BlockingQueue conns; - + protected final ConcurrentHashMap attributes = new ConcurrentHashMap<>(); - + public TransportNode(int poolmaxconns, InetSocketAddress address) { this.address = address; this.disabletime = 0; this.conns = new ArrayBlockingQueue<>(poolmaxconns); } - + @ConstructorParameters({"poolmaxconns", "address", "disabletime"}) public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) { this.address = address; this.disabletime = disabletime; this.conns = new ArrayBlockingQueue<>(poolmaxconns); } - + public int getPoolmaxconns() { return this.conns.remainingCapacity() + this.conns.size(); } - + public T setAttribute(String name, T value) { attributes.put(name, value); return value; } - + @SuppressWarnings("unchecked") public T getAttribute(String name) { return (T) attributes.get(name); } - + @SuppressWarnings("unchecked") public T removeAttribute(String name) { return (T) attributes.remove(name); } - + public TransportNode clearAttributes() { attributes.clear(); return this; } - + public ConcurrentHashMap getAttributes() { return attributes; } - + public void setAttributes(ConcurrentHashMap map) { attributes.clear(); if (map != null) attributes.putAll(map); } - + public InetSocketAddress getAddress() { return address; } - + public long getDisabletime() { return disabletime; } - + @ConvertDisabled public BlockingQueue getConns() { return conns; } - + public void dispose() { AsyncConnection conn; while ((conn = conns.poll()) != null) { conn.dispose(); } } - + @Override public int hashCode() { return this.address.hashCode(); } - + @Override public boolean equals(Object obj) { if (this == obj) return true; @@ -483,7 +487,7 @@ public final class Transport { final TransportNode other = (TransportNode) obj; return this.address.equals(other.address); } - + @Override public String toString() { return JsonConvert.root().convertTo(this);