diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 239f67ebc..0e12629b0 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -11,6 +11,8 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.Supplier; +import org.redkale.convert.*; +import org.redkale.convert.json.JsonConvert; import org.redkale.util.*; /** @@ -52,7 +54,7 @@ public final class Transport { protected final InetSocketAddress clientAddress; - protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0]; + protected TransportAddress[] transportAddres = new TransportAddress[0]; protected final ObjectPool bufferPool; @@ -82,28 +84,33 @@ public final class Transport { } public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { - InetSocketAddress[] oldAddresses = this.remoteAddres; - List list = new ArrayList<>(); + TransportAddress[] oldAddresses = this.transportAddres; + List list = new ArrayList<>(); if (addresses != null) { for (InetSocketAddress addr : addresses) { if (clientAddress != null && clientAddress.equals(addr)) continue; - list.add(addr); + list.add(new TransportAddress(addr)); } } - this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]); - return oldAddresses; + this.transportAddres = list.toArray(new TransportAddress[list.size()]); + + InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length]; + for (int i = 0; i < rs.length; i++) { + rs[i] = oldAddresses[i].getAddress(); + } + return rs; } public final boolean addRemoteAddresses(final InetSocketAddress addr) { if (addr == null) return false; synchronized (this) { - if (this.remoteAddres == null) { - this.remoteAddres = new InetSocketAddress[]{addr}; + if (this.transportAddres == null) { + this.transportAddres = new TransportAddress[]{new TransportAddress(addr)}; } else { - for (InetSocketAddress i : this.remoteAddres) { - if (addr.equals(i)) return false; + for (TransportAddress i : this.transportAddres) { + if (addr.equals(i.address)) return false; } - this.remoteAddres = Utility.append(remoteAddres, addr); + this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr)); } return true; } @@ -111,9 +118,9 @@ public final class Transport { public final boolean removeRemoteAddresses(InetSocketAddress addr) { if (addr == null) return false; - if (this.remoteAddres == null) return false; + if (this.transportAddres == null) return false; synchronized (this) { - this.remoteAddres = Utility.remove(remoteAddres, addr); + this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr)); } return true; } @@ -134,8 +141,16 @@ public final class Transport { return clientAddress; } + public TransportAddress[] getTransportAddresses() { + return transportAddres; + } + public InetSocketAddress[] getRemoteAddresses() { - return remoteAddres; + InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length]; + for (int i = 0; i < rs.length; i++) { + rs[i] = transportAddres[i].getAddress(); + } + return rs; } public ConcurrentHashMap> getAsyncConnectionPool() { @@ -144,7 +159,7 @@ public final class Transport { @Override public String toString() { - return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}"; + return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}"; } public ByteBuffer pollBuffer() { @@ -169,32 +184,56 @@ public final class Transport { public AsyncConnection pollConnection(SocketAddress addr) { if (this.strategy != null) return strategy.pollConnection(addr, this); - if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0]; + if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address; final boolean rand = addr == null; - if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list"); + if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list"); try { if (tcp) { AsynchronousSocketChannel channel = null; if (rand) { //取地址 - for (int i = 0; i < remoteAddres.length; i++) { - addr = remoteAddres[i]; - BlockingQueue queue = connPool.get(addr); - if (queue != null && !queue.isEmpty()) { + TransportAddress transportAddr; + boolean tryed = false; + for (int i = 0; i < transportAddres.length; i++) { + transportAddr = transportAddres[i]; + addr = transportAddr.address; + if (!transportAddr.enable) continue; + final BlockingQueue queue = transportAddr.conns; + if (!queue.isEmpty()) { AsyncConnection conn; while ((conn = queue.poll()) != null) { if (conn.isOpen()) return conn; } } + tryed = true; if (channel == null) { channel = AsynchronousSocketChannel.open(group); if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); } try { channel.connect(addr).get(2, TimeUnit.SECONDS); + transportAddr.enable = true; break; } catch (Exception iex) { - iex.printStackTrace(); - if (i == remoteAddres.length - 1) channel = null; + transportAddr.enable = false; + channel = null; + } + } + if (channel == null && !tryed) { + for (int i = 0; i < transportAddres.length; i++) { + transportAddr = transportAddres[i]; + addr = transportAddr.address; + if (channel == null) { + channel = AsynchronousSocketChannel.open(group); + if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } + try { + channel.connect(addr).get(2, TimeUnit.SECONDS); + transportAddr.enable = true; + break; + } catch (Exception iex) { + transportAddr.enable = false; + channel = null; + } } } } else { @@ -205,7 +244,7 @@ public final class Transport { if (channel == null) return null; return AsyncConnection.create(channel, addr, 3000, 3000); } else { // UDP - if (rand) addr = remoteAddres[0]; + if (rand) addr = this.transportAddres[0].address; DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(true); channel.connect(addr); @@ -267,4 +306,54 @@ public final class Transport { }); } + public static class TransportAddress { + + protected InetSocketAddress address; + + protected volatile boolean enable; + + protected final BlockingQueue conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT); + + public TransportAddress(InetSocketAddress address) { + this.address = address; + this.enable = true; + } + + @java.beans.ConstructorProperties({"address", "enable"}) + public TransportAddress(InetSocketAddress address, boolean enable) { + this.address = address; + this.enable = enable; + } + + public InetSocketAddress getAddress() { + return address; + } + + public boolean isEnable() { + return enable; + } + + @ConvertColumn(ignore = true) + public BlockingQueue getConns() { + return conns; + } + + @Override + public int hashCode() { + return this.address.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + final TransportAddress other = (TransportAddress) obj; + return this.address.equals(other.address); + } + + public String toString() { + return JsonConvert.root().convertTo(this); + } + } }