diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index ccf3e3e18..b4858c8dd 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -294,6 +294,7 @@ public final class Transport { } public void offerConnection(final boolean forceClose, AsyncConnection conn) { + if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return; if (!forceClose && conn.isTCP()) { if (conn.isOpen()) { TransportAddress taddr = findTransportAddress(conn.getRemoteAddress()); @@ -350,6 +351,8 @@ public final class Transport { protected final BlockingQueue conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT); + protected final ConcurrentHashMap attributes = new ConcurrentHashMap<>(); + public TransportAddress(InetSocketAddress address) { this.address = address; this.enable = true; @@ -361,6 +364,35 @@ public final class Transport { this.enable = enable; } + public T setAttribute(String name, T value) { + attributes.put(name, value); + return value; + } + + @SuppressWarnings("unchecked") + public T getAttribute(String name) { + return (T) attributes.get(name); + } + + @SuppressWarnings("unchecked") + public T removeAttribute(String name) { + return (T) attributes.remove(name); + } + + public TransportAddress clearAttributes() { + attributes.clear(); + return this; + } + + public ConcurrentHashMap getAttributes() { + return attributes; + } + + public void setAttributes(ConcurrentHashMap map) { + attributes.clear(); + if (map != null) attributes.putAll(map); + } + public InetSocketAddress getAddress() { return address; } diff --git a/src/org/redkale/net/TransportStrategy.java b/src/org/redkale/net/TransportStrategy.java index 78efe774b..1464c410f 100644 --- a/src/org/redkale/net/TransportStrategy.java +++ b/src/org/redkale/net/TransportStrategy.java @@ -18,5 +18,25 @@ import java.util.concurrent.CompletableFuture; */ public interface TransportStrategy { + /** + * 创建AsyncConnection + * + * @param addr 服务器地址 + * @param transport Transport + * + * @return AsyncConnection + */ public CompletableFuture pollConnection(SocketAddress addr, Transport transport); + + /** + * 回收AsyncConnection,返回false表示使用Transport默认的回收实现, 返回true表示自定义回收实现 + * + * @param forceClose 是否强制关闭 + * @param conn AsyncConnection + * + * @return boolean + */ + default boolean offerConnection(final boolean forceClose, AsyncConnection conn) { + return false; + } }