This commit is contained in:
Redkale
2018-03-30 08:56:00 +08:00
parent 84a15afc9a
commit b14e14659c
2 changed files with 73 additions and 67 deletions

View File

@@ -62,7 +62,7 @@ public final class Transport {
protected final InetSocketAddress clientAddress;
//不可能为null
protected TransportAddress[] transportAddrs = new TransportAddress[0];
protected TransportNode[] transportNodes = new TransportNode[0];
protected final ObjectPool<ByteBuffer> bufferPool;
@@ -96,14 +96,14 @@ public final class Transport {
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
final TransportAddress[] oldAddresses = this.transportAddrs;
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
List<TransportAddress> list = new ArrayList<>();
List<TransportNode> list = new ArrayList<>();
if (addresses != null) {
for (InetSocketAddress addr : addresses) {
if (clientAddress != null && clientAddress.equals(addr)) continue;
boolean hasold = false;
for (TransportAddress oldAddr : oldAddresses) {
for (TransportNode oldAddr : oldNodes) {
if (oldAddr.getAddress().equals(addr)) {
list.add(oldAddr);
hasold = true;
@@ -111,14 +111,14 @@ public final class Transport {
}
}
if (hasold) continue;
list.add(new TransportAddress(addr));
list.add(new TransportNode(factory.poolmaxconns, addr));
}
}
this.transportAddrs = list.toArray(new TransportAddress[list.size()]);
this.transportNodes = list.toArray(new TransportNode[list.size()]);
}
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
InetSocketAddress[] rs = new InetSocketAddress[oldNodes.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = oldAddresses[i].getAddress();
rs[i] = oldNodes[i].getAddress();
}
return rs;
}
@@ -127,13 +127,13 @@ public final class Transport {
if (addr == null) return false;
if (clientAddress != null && clientAddress.equals(addr)) return false;
synchronized (this) {
if (this.transportAddrs.length == 0) {
this.transportAddrs = new TransportAddress[]{new TransportAddress(addr)};
if (this.transportNodes.length == 0) {
this.transportNodes = new TransportNode[]{new TransportNode(factory.poolmaxconns, addr)};
} else {
for (TransportAddress i : this.transportAddrs) {
for (TransportNode i : this.transportNodes) {
if (addr.equals(i.address)) return false;
}
this.transportAddrs = Utility.append(transportAddrs, new TransportAddress(addr));
this.transportNodes = Utility.append(transportNodes, new TransportNode(factory.poolmaxconns, addr));
}
return true;
}
@@ -142,7 +142,7 @@ public final class Transport {
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
this.transportAddrs = Utility.remove(transportAddrs, new TransportAddress(addr));
this.transportNodes = Utility.remove(transportNodes, new TransportNode(factory.poolmaxconns, addr));
}
return true;
}
@@ -156,10 +156,10 @@ public final class Transport {
}
public void close() {
TransportAddress[] taddrs = this.transportAddrs;
if (taddrs == null) return;
for (TransportAddress taddr : taddrs) {
if (taddr != null) taddr.dispose();
TransportNode[] nodes = this.transportNodes;
if (nodes == null) return;
for (TransportNode node : nodes) {
if (node != null) node.dispose();
}
}
@@ -167,28 +167,28 @@ public final class Transport {
return clientAddress;
}
public TransportAddress[] getTransportAddresses() {
return transportAddrs;
public TransportNode[] getTransportNodes() {
return transportNodes;
}
public TransportAddress findTransportAddress(SocketAddress addr) {
for (TransportAddress taddr : this.transportAddrs) {
if (taddr.address.equals(addr)) return taddr;
public TransportNode findTransportNode(SocketAddress addr) {
for (TransportNode node : this.transportNodes) {
if (node.address.equals(addr)) return node;
}
return null;
}
public InetSocketAddress[] getRemoteAddresses() {
InetSocketAddress[] rs = new InetSocketAddress[transportAddrs.length];
InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = transportAddrs[i].getAddress();
rs[i] = transportNodes[i].getAddress();
}
return rs;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddrs) + "}";
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}";
}
public ByteBuffer pollBuffer() {
@@ -217,25 +217,25 @@ public final class Transport {
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr0) {
if (this.strategy != null) return strategy.pollConnection(addr0, this);
final TransportAddress[] taddrs = this.transportAddrs;
if (addr0 == null && taddrs.length == 1) addr0 = taddrs[0].address;
final TransportNode[] nodes = this.transportNodes;
if (addr0 == null && nodes.length == 1) addr0 = nodes[0].address;
final SocketAddress addr = addr0;
final boolean rand = addr == null; //是否随机取地址
if (rand && taddrs.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
if (rand && nodes.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
try {
if (!tcp) { // UDP
SocketAddress udpaddr = rand ? taddrs[0].address : addr;
SocketAddress udpaddr = rand ? nodes[0].address : addr;
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(udpaddr);
return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSecond, factory.writeTimeoutSecond));
}
if (!rand) { //指定地址
TransportAddress taddr = findTransportAddress(addr);
if (taddr == null) {
TransportNode node = findTransportNode(addr);
if (node == null) {
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond);
}
final BlockingQueue<AsyncConnection> queue = taddr.conns;
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
@@ -247,14 +247,14 @@ public final class Transport {
//---------------------随机取地址------------------------
int enablecount = 0;
final TransportAddress[] newtaddrs = new TransportAddress[taddrs.length];
for (final TransportAddress taddr : taddrs) {
if (taddr.disabletime > 0) continue;
newtaddrs[enablecount++] = taddr;
final TransportNode[] newnodes = new TransportNode[nodes.length];
for (final TransportNode node : nodes) {
if (node.disabletime > 0) continue;
newnodes[enablecount++] = node;
}
final long now = System.currentTimeMillis();
if (enablecount > 0) { //存在可用的地址
final TransportAddress one = newtaddrs[Math.abs(seq.incrementAndGet()) % enablecount];
final TransportNode one = newnodes[Math.abs(seq.incrementAndGet()) % enablecount];
final BlockingQueue<AsyncConnection> queue = one.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
@@ -265,9 +265,9 @@ public final class Transport {
CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(one.address, one, new CompletionHandler<Void, TransportAddress>() {
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportAddress attachment) {
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
@@ -278,14 +278,14 @@ public final class Transport {
}
@Override
public void failed(Throwable exc, TransportAddress attachment) {
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
try {
channel.close();
} catch (Exception e) {
}
try {
pollConnection0(taddrs, one, now).whenComplete((r, t) -> {
pollConnection0(nodes, one, now).whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
@@ -300,24 +300,24 @@ public final class Transport {
});
return future;
}
return pollConnection0(taddrs, null, now);
return pollConnection0(nodes, null, now);
} catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex);
}
}
private CompletableFuture<AsyncConnection> pollConnection0(TransportAddress[] taddrs, TransportAddress exclude, long now) throws IOException {
private CompletableFuture<AsyncConnection> pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException {
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(taddrs.length);
AtomicInteger count = new AtomicInteger(nodes.length);
CompletableFuture future = new CompletableFuture();
for (final TransportAddress taddr : taddrs) {
if (taddr == exclude) continue;
for (final TransportNode node : nodes) {
if (node == exclude) continue;
if (future.isDone()) return future;
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(taddr.address, taddr, new CompletionHandler<Void, TransportAddress>() {
channel.connect(node.address, node, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportAddress attachment) {
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
if (future.isDone()) {
@@ -328,7 +328,7 @@ public final class Transport {
}
@Override
public void failed(Throwable exc, TransportAddress attachment) {
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
if (count.decrementAndGet() < 1) {
future.completeExceptionally(exc);
@@ -347,8 +347,8 @@ public final class Transport {
if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return;
if (!forceClose && conn.isTCP()) {
if (conn.isOpen()) {
TransportAddress taddr = findTransportAddress(conn.getRemoteAddress());
if (taddr == null || !taddr.conns.offer(conn)) conn.dispose();
TransportNode node = findTransportNode(conn.getRemoteAddress());
if (node == null || !node.conns.offer(conn)) conn.dispose();
}
} else {
conn.dispose();
@@ -393,25 +393,31 @@ public final class Transport {
});
}
public class TransportAddress {
public static class TransportNode {
protected InetSocketAddress address;
protected volatile long disabletime; //不可用时的时间, 为0表示可用
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(factory.poolmaxconns);
protected final BlockingQueue<AsyncConnection> conns;
protected final ConcurrentHashMap<String, Object> attributes = new ConcurrentHashMap<>();
public TransportAddress(InetSocketAddress address) {
public TransportNode(int poolmaxconns, InetSocketAddress address) {
this.address = address;
this.disabletime = 0;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
@ConstructorParameters({"address", "disabletime"})
public TransportAddress(InetSocketAddress address, long disabletime) {
@ConstructorParameters({"poolmaxconns", "address", "disabletime"})
public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) {
this.address = address;
this.disabletime = disabletime;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
public int getPoolmaxconns() {
return this.conns.remainingCapacity() + this.conns.size();
}
public <T> T setAttribute(String name, T value) {
@@ -429,7 +435,7 @@ public final class Transport {
return (T) attributes.remove(name);
}
public TransportAddress clearAttributes() {
public TransportNode clearAttributes() {
attributes.clear();
return this;
}
@@ -473,7 +479,7 @@ public final class Transport {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
final TransportAddress other = (TransportAddress) obj;
final TransportNode other = (TransportNode) obj;
return this.address.equals(other.address);
}

View File

@@ -346,19 +346,19 @@ public class TransportFactory {
nulllist.add(ref);
continue;
}
Transport.TransportAddress[] taddrs = transport.getTransportAddresses();
for (final Transport.TransportAddress taddr : taddrs) {
if (taddr.disabletime < 1) continue; //可用
Transport.TransportNode[] nodes = transport.getTransportNodes();
for (final Transport.TransportNode node : nodes) {
if (node.disabletime < 1) continue; //可用
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(transport.group);
channel.connect(taddr.address, taddr, new CompletionHandler<Void, Transport.TransportAddress>() {
channel.connect(node.address, node, new CompletionHandler<Void, Transport.TransportNode>() {
@Override
public void completed(Void result, Transport.TransportAddress attachment) {
public void completed(Void result, Transport.TransportNode attachment) {
attachment.disabletime = 0;
}
@Override
public void failed(Throwable exc, Transport.TransportAddress attachment) {
public void failed(Throwable exc, Transport.TransportNode attachment) {
attachment.disabletime = System.currentTimeMillis();
}
});
@@ -376,9 +376,9 @@ public class TransportFactory {
for (WeakReference<Transport> ref : transportReferences) {
Transport transport = ref.get();
if (transport == null) continue;
Transport.TransportAddress[] taddrs = transport.getTransportAddresses();
for (final Transport.TransportAddress taddr : taddrs) {
final BlockingQueue<AsyncConnection> queue = taddr.conns;
Transport.TransportNode[] nodes = transport.getTransportNodes();
for (final Transport.TransportNode node : nodes) {
final BlockingQueue<AsyncConnection> queue = node.conns;
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.getLastWriteTime() > timex && false) { //最近几秒内已经进行过IO操作