From b14e14659ce253a942a58749f463745e8baff743 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Fri, 30 Mar 2018 08:56:00 +0800 Subject: [PATCH] --- src/org/redkale/net/Transport.java | 122 ++++++++++++---------- src/org/redkale/net/TransportFactory.java | 18 ++-- 2 files changed, 73 insertions(+), 67 deletions(-) diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index a3dc55848..e7f482f91 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -62,7 +62,7 @@ public final class Transport { protected final InetSocketAddress clientAddress; //不可能为null - protected TransportAddress[] transportAddrs = new TransportAddress[0]; + protected TransportNode[] transportNodes = new TransportNode[0]; protected final ObjectPool bufferPool; @@ -96,14 +96,14 @@ public final class Transport { } public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { - final TransportAddress[] oldAddresses = this.transportAddrs; + final TransportNode[] oldNodes = this.transportNodes; synchronized (this) { - List list = new ArrayList<>(); + List list = new ArrayList<>(); if (addresses != null) { for (InetSocketAddress addr : addresses) { if (clientAddress != null && clientAddress.equals(addr)) continue; boolean hasold = false; - for (TransportAddress oldAddr : oldAddresses) { + for (TransportNode oldAddr : oldNodes) { if (oldAddr.getAddress().equals(addr)) { list.add(oldAddr); hasold = true; @@ -111,14 +111,14 @@ public final class Transport { } } if (hasold) continue; - list.add(new TransportAddress(addr)); + list.add(new TransportNode(factory.poolmaxconns, addr)); } } - this.transportAddrs = list.toArray(new TransportAddress[list.size()]); + this.transportNodes = list.toArray(new TransportNode[list.size()]); } - InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length]; + InetSocketAddress[] rs = new InetSocketAddress[oldNodes.length]; for (int i = 0; i < rs.length; i++) { - rs[i] = oldAddresses[i].getAddress(); + rs[i] = oldNodes[i].getAddress(); } return rs; } @@ -127,13 +127,13 @@ public final class Transport { if (addr == null) return false; if (clientAddress != null && clientAddress.equals(addr)) return false; synchronized (this) { - if (this.transportAddrs.length == 0) { - this.transportAddrs = new TransportAddress[]{new TransportAddress(addr)}; + if (this.transportNodes.length == 0) { + this.transportNodes = new TransportNode[]{new TransportNode(factory.poolmaxconns, addr)}; } else { - for (TransportAddress i : this.transportAddrs) { + for (TransportNode i : this.transportNodes) { if (addr.equals(i.address)) return false; } - this.transportAddrs = Utility.append(transportAddrs, new TransportAddress(addr)); + this.transportNodes = Utility.append(transportNodes, new TransportNode(factory.poolmaxconns, addr)); } return true; } @@ -142,7 +142,7 @@ public final class Transport { public final boolean removeRemoteAddresses(InetSocketAddress addr) { if (addr == null) return false; synchronized (this) { - this.transportAddrs = Utility.remove(transportAddrs, new TransportAddress(addr)); + this.transportNodes = Utility.remove(transportNodes, new TransportNode(factory.poolmaxconns, addr)); } return true; } @@ -156,10 +156,10 @@ public final class Transport { } public void close() { - TransportAddress[] taddrs = this.transportAddrs; - if (taddrs == null) return; - for (TransportAddress taddr : taddrs) { - if (taddr != null) taddr.dispose(); + TransportNode[] nodes = this.transportNodes; + if (nodes == null) return; + for (TransportNode node : nodes) { + if (node != null) node.dispose(); } } @@ -167,28 +167,28 @@ public final class Transport { return clientAddress; } - public TransportAddress[] getTransportAddresses() { - return transportAddrs; + public TransportNode[] getTransportNodes() { + return transportNodes; } - public TransportAddress findTransportAddress(SocketAddress addr) { - for (TransportAddress taddr : this.transportAddrs) { - if (taddr.address.equals(addr)) return taddr; + public TransportNode findTransportNode(SocketAddress addr) { + for (TransportNode node : this.transportNodes) { + if (node.address.equals(addr)) return node; } return null; } public InetSocketAddress[] getRemoteAddresses() { - InetSocketAddress[] rs = new InetSocketAddress[transportAddrs.length]; + InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length]; for (int i = 0; i < rs.length; i++) { - rs[i] = transportAddrs[i].getAddress(); + rs[i] = transportNodes[i].getAddress(); } return rs; } @Override public String toString() { - return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddrs) + "}"; + return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}"; } public ByteBuffer pollBuffer() { @@ -217,25 +217,25 @@ public final class Transport { public CompletableFuture pollConnection(SocketAddress addr0) { if (this.strategy != null) return strategy.pollConnection(addr0, this); - final TransportAddress[] taddrs = this.transportAddrs; - if (addr0 == null && taddrs.length == 1) addr0 = taddrs[0].address; + final TransportNode[] nodes = this.transportNodes; + if (addr0 == null && nodes.length == 1) addr0 = nodes[0].address; final SocketAddress addr = addr0; final boolean rand = addr == null; //是否随机取地址 - if (rand && taddrs.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list"); + if (rand && nodes.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list"); try { if (!tcp) { // UDP - SocketAddress udpaddr = rand ? taddrs[0].address : addr; + SocketAddress udpaddr = rand ? nodes[0].address : addr; DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(true); channel.connect(udpaddr); return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSecond, factory.writeTimeoutSecond)); } if (!rand) { //指定地址 - TransportAddress taddr = findTransportAddress(addr); - if (taddr == null) { + TransportNode node = findTransportNode(addr); + if (node == null) { return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond); } - final BlockingQueue queue = taddr.conns; + final BlockingQueue queue = node.conns; if (!queue.isEmpty()) { AsyncConnection conn; while ((conn = queue.poll()) != null) { @@ -247,14 +247,14 @@ public final class Transport { //---------------------随机取地址------------------------ int enablecount = 0; - final TransportAddress[] newtaddrs = new TransportAddress[taddrs.length]; - for (final TransportAddress taddr : taddrs) { - if (taddr.disabletime > 0) continue; - newtaddrs[enablecount++] = taddr; + final TransportNode[] newnodes = new TransportNode[nodes.length]; + for (final TransportNode node : nodes) { + if (node.disabletime > 0) continue; + newnodes[enablecount++] = node; } final long now = System.currentTimeMillis(); if (enablecount > 0) { //存在可用的地址 - final TransportAddress one = newtaddrs[Math.abs(seq.incrementAndGet()) % enablecount]; + final TransportNode one = newnodes[Math.abs(seq.incrementAndGet()) % enablecount]; final BlockingQueue queue = one.conns; if (!queue.isEmpty()) { AsyncConnection conn; @@ -265,9 +265,9 @@ public final class Transport { CompletableFuture future = new CompletableFuture(); final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.connect(one.address, one, new CompletionHandler() { + channel.connect(one.address, one, new CompletionHandler() { @Override - public void completed(Void result, TransportAddress attachment) { + public void completed(Void result, TransportNode attachment) { attachment.disabletime = 0; AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond); if (future.isDone()) { @@ -278,14 +278,14 @@ public final class Transport { } @Override - public void failed(Throwable exc, TransportAddress attachment) { + public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; try { channel.close(); } catch (Exception e) { } try { - pollConnection0(taddrs, one, now).whenComplete((r, t) -> { + pollConnection0(nodes, one, now).whenComplete((r, t) -> { if (t != null) { future.completeExceptionally(t); } else { @@ -300,24 +300,24 @@ public final class Transport { }); return future; } - return pollConnection0(taddrs, null, now); + return pollConnection0(nodes, null, now); } catch (Exception ex) { throw new RuntimeException("transport address = " + addr, ex); } } - private CompletableFuture pollConnection0(TransportAddress[] taddrs, TransportAddress exclude, long now) throws IOException { + private CompletableFuture pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException { //从可用/不可用的地址列表中创建连接 - AtomicInteger count = new AtomicInteger(taddrs.length); + AtomicInteger count = new AtomicInteger(nodes.length); CompletableFuture future = new CompletableFuture(); - for (final TransportAddress taddr : taddrs) { - if (taddr == exclude) continue; + for (final TransportNode node : nodes) { + if (node == exclude) continue; if (future.isDone()) return future; final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.connect(taddr.address, taddr, new CompletionHandler() { + channel.connect(node.address, node, new CompletionHandler() { @Override - public void completed(Void result, TransportAddress attachment) { + public void completed(Void result, TransportNode attachment) { attachment.disabletime = 0; AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond); if (future.isDone()) { @@ -328,7 +328,7 @@ public final class Transport { } @Override - public void failed(Throwable exc, TransportAddress attachment) { + public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; if (count.decrementAndGet() < 1) { future.completeExceptionally(exc); @@ -347,8 +347,8 @@ public final class Transport { if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return; if (!forceClose && conn.isTCP()) { if (conn.isOpen()) { - TransportAddress taddr = findTransportAddress(conn.getRemoteAddress()); - if (taddr == null || !taddr.conns.offer(conn)) conn.dispose(); + TransportNode node = findTransportNode(conn.getRemoteAddress()); + if (node == null || !node.conns.offer(conn)) conn.dispose(); } } else { conn.dispose(); @@ -393,25 +393,31 @@ public final class Transport { }); } - public class TransportAddress { + public static class TransportNode { protected InetSocketAddress address; protected volatile long disabletime; //不可用时的时间, 为0表示可用 - protected final BlockingQueue conns = new ArrayBlockingQueue<>(factory.poolmaxconns); + protected final BlockingQueue conns; protected final ConcurrentHashMap attributes = new ConcurrentHashMap<>(); - public TransportAddress(InetSocketAddress address) { + public TransportNode(int poolmaxconns, InetSocketAddress address) { this.address = address; this.disabletime = 0; + this.conns = new ArrayBlockingQueue<>(poolmaxconns); } - @ConstructorParameters({"address", "disabletime"}) - public TransportAddress(InetSocketAddress address, long disabletime) { + @ConstructorParameters({"poolmaxconns", "address", "disabletime"}) + public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) { this.address = address; this.disabletime = disabletime; + this.conns = new ArrayBlockingQueue<>(poolmaxconns); + } + + public int getPoolmaxconns() { + return this.conns.remainingCapacity() + this.conns.size(); } public T setAttribute(String name, T value) { @@ -429,7 +435,7 @@ public final class Transport { return (T) attributes.remove(name); } - public TransportAddress clearAttributes() { + public TransportNode clearAttributes() { attributes.clear(); return this; } @@ -473,7 +479,7 @@ public final class Transport { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; - final TransportAddress other = (TransportAddress) obj; + final TransportNode other = (TransportNode) obj; return this.address.equals(other.address); } diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 6448fdcff..a15ee00ec 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -346,19 +346,19 @@ public class TransportFactory { nulllist.add(ref); continue; } - Transport.TransportAddress[] taddrs = transport.getTransportAddresses(); - for (final Transport.TransportAddress taddr : taddrs) { - if (taddr.disabletime < 1) continue; //可用 + Transport.TransportNode[] nodes = transport.getTransportNodes(); + for (final Transport.TransportNode node : nodes) { + if (node.disabletime < 1) continue; //可用 try { final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(transport.group); - channel.connect(taddr.address, taddr, new CompletionHandler() { + channel.connect(node.address, node, new CompletionHandler() { @Override - public void completed(Void result, Transport.TransportAddress attachment) { + public void completed(Void result, Transport.TransportNode attachment) { attachment.disabletime = 0; } @Override - public void failed(Throwable exc, Transport.TransportAddress attachment) { + public void failed(Throwable exc, Transport.TransportNode attachment) { attachment.disabletime = System.currentTimeMillis(); } }); @@ -376,9 +376,9 @@ public class TransportFactory { for (WeakReference ref : transportReferences) { Transport transport = ref.get(); if (transport == null) continue; - Transport.TransportAddress[] taddrs = transport.getTransportAddresses(); - for (final Transport.TransportAddress taddr : taddrs) { - final BlockingQueue queue = taddr.conns; + Transport.TransportNode[] nodes = transport.getTransportNodes(); + for (final Transport.TransportNode node : nodes) { + final BlockingQueue queue = node.conns; AsyncConnection conn; while ((conn = queue.poll()) != null) { if (conn.getLastWriteTime() > timex && false) { //最近几秒内已经进行过IO操作