This commit is contained in:
Redkale
2018-06-01 10:00:59 +08:00
parent bcca20dbbb
commit fa9e62d2a7

View File

@@ -29,41 +29,41 @@ import org.redkale.util.*;
* @author zhangjx
*/
public final class Transport {
public static final String DEFAULT_PROTOCOL = "TCP";
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final TransportFactory factory;
protected final String name; //即<group>的name属性
protected final String subprotocol; //即<group>的subprotocol属性
protected final boolean tcp;
protected final String protocol;
protected final AsynchronousChannelGroup group;
protected final InetSocketAddress clientAddress;
//不可能为null
protected TransportNode[] transportNodes = new TransportNode[0];
protected final ObjectPool<ByteBuffer> bufferPool;
protected final SSLContext sslContext;
//负载均衡策略
protected final TransportStrategy strategy;
protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy);
}
protected Transport(String name, String protocol, String subprotocol,
final TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
@@ -81,7 +81,7 @@ public final class Transport {
this.strategy = strategy;
updateRemoteAddresses(addresses);
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
@@ -109,7 +109,7 @@ public final class Transport {
}
return rs;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
if (clientAddress != null && clientAddress.equals(addr)) return false;
@@ -125,7 +125,7 @@ public final class Transport {
return true;
}
}
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
@@ -133,15 +133,15 @@ public final class Transport {
}
return true;
}
public String getName() {
return name;
}
public String getSubprotocol() {
return subprotocol;
}
public void close() {
TransportNode[] nodes = this.transportNodes;
if (nodes == null) return;
@@ -149,22 +149,22 @@ public final class Transport {
if (node != null) node.dispose();
}
}
public InetSocketAddress getClientAddress() {
return clientAddress;
}
public TransportNode[] getTransportNodes() {
return transportNodes;
}
public TransportNode findTransportNode(SocketAddress addr) {
for (TransportNode node : this.transportNodes) {
if (node.address.equals(addr)) return node;
}
return null;
}
public InetSocketAddress[] getRemoteAddresses() {
InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length];
for (int i = 0; i < rs.length; i++) {
@@ -172,36 +172,36 @@ public final class Transport {
}
return rs;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}";
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public AsynchronousChannelGroup getTransportChannelGroup() {
return group;
}
public boolean isTCP() {
return tcp;
}
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr0) {
if (this.strategy != null) return strategy.pollConnection(addr0, this);
final TransportNode[] nodes = this.transportNodes;
@@ -273,7 +273,7 @@ public final class Transport {
future.complete(asyncConn);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
@@ -289,7 +289,7 @@ public final class Transport {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
@@ -302,7 +302,7 @@ public final class Transport {
throw new RuntimeException("transport address = " + addr, ex);
}
}
private CompletableFuture<AsyncConnection> pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException {
//从可用/不可用的地址列表中创建连接
AtomicInteger count = new AtomicInteger(nodes.length);
@@ -317,15 +317,19 @@ public final class Transport {
channel.connect(node.address, node, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
try {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
} catch (Exception e) {
failed(e, attachment);
}
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
@@ -341,7 +345,7 @@ public final class Transport {
}
return future;
}
public void offerConnection(final boolean forceClose, AsyncConnection conn) {
if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return;
if (!forceClose && conn.isTCP()) {
@@ -355,7 +359,7 @@ public final class Transport {
conn.dispose();
}
}
public <A> void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {
pollConnection(addr).whenComplete((conn, ex) -> {
if (ex != null) {
@@ -363,28 +367,28 @@ public final class Transport {
return;
}
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.clear();
conn.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) handler.completed(result, att);
offerBuffer(buffer);
offerConnection(false, conn);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
offerConnection(true, conn);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer);
@@ -393,88 +397,88 @@ public final class Transport {
});
});
}
public static class TransportNode {
protected InetSocketAddress address;
protected volatile long disabletime; //不可用时的时间, 为0表示可用
protected final BlockingQueue<AsyncConnection> conns;
protected final ConcurrentHashMap<String, Object> attributes = new ConcurrentHashMap<>();
public TransportNode(int poolmaxconns, InetSocketAddress address) {
this.address = address;
this.disabletime = 0;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
@ConstructorParameters({"poolmaxconns", "address", "disabletime"})
public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) {
this.address = address;
this.disabletime = disabletime;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
}
public int getPoolmaxconns() {
return this.conns.remainingCapacity() + this.conns.size();
}
public <T> T setAttribute(String name, T value) {
attributes.put(name, value);
return value;
}
@SuppressWarnings("unchecked")
public <T> T getAttribute(String name) {
return (T) attributes.get(name);
}
@SuppressWarnings("unchecked")
public <T> T removeAttribute(String name) {
return (T) attributes.remove(name);
}
public TransportNode clearAttributes() {
attributes.clear();
return this;
}
public ConcurrentHashMap<String, Object> getAttributes() {
return attributes;
}
public void setAttributes(ConcurrentHashMap<String, Object> map) {
attributes.clear();
if (map != null) attributes.putAll(map);
}
public InetSocketAddress getAddress() {
return address;
}
public long getDisabletime() {
return disabletime;
}
@ConvertDisabled
public BlockingQueue<AsyncConnection> getConns() {
return conns;
}
public void dispose() {
AsyncConnection conn;
while ((conn = conns.poll()) != null) {
conn.dispose();
}
}
@Override
public int hashCode() {
return this.address.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
@@ -483,7 +487,7 @@ public final class Transport {
final TransportNode other = (TransportNode) obj;
return this.address.equals(other.address);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);