From ad059eb3d6766180134993beff553b373b773c84 Mon Sep 17 00:00:00 2001 From: Redkale Date: Fri, 13 Jan 2023 11:05:04 +0800 Subject: [PATCH] =?UTF-8?q?client=E6=B3=9B=E5=9E=8B=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/client/Client.java | 39 ++++++++++--------- .../redkale/net/client/ClientConnection.java | 4 +- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 817c250c9..e250d6cab 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -21,10 +21,11 @@ import org.redkale.util.*; * @author zhangjx * @since 2.3.0 * + * @param 连接对象 * @param 请求对象 * @param

响应对象 */ -public abstract class Client implements Resourcable { +public abstract class Client, R extends ClientRequest, P> implements Resourcable { public static final int DEFAULT_MAX_PIPELINES = 128; @@ -59,7 +60,7 @@ public abstract class Client implements Resourcable protected AtomicBoolean[] connOpenStates; //conns的标记组,当conn不存在或closed状态,标记为false - protected final Queue>[] connAcquireWaitings; //连接等待池 + protected final Queue>[] connAcquireWaitings; //连接等待池 protected int connLimit = Utility.cpus(); //最大连接数 @@ -79,7 +80,7 @@ public abstract class Client implements Resourcable protected Supplier closeRequestSupplier; //创建连接后进行的登录鉴权操作 - protected Function, CompletableFuture> authenticate; + protected Function, CompletableFuture> authenticate; protected Client(String name, AsyncGroup group, ClientAddress address) { this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); @@ -98,18 +99,18 @@ public abstract class Client implements Resourcable } protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - Function, CompletableFuture> authenticate) { + Function, CompletableFuture> authenticate) { this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate); } protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { + Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); } @SuppressWarnings("OverridableMethodCallInConstructor") protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { + int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { if (maxPipelines < 1) { throw new IllegalArgumentException("maxPipelines must bigger 0"); } @@ -155,7 +156,7 @@ public abstract class Client implements Resourcable if (now - conn.getLastWriteTime() < 10_000) { continue; } - conn.writeChannel(req).thenAccept(p -> handlePingResult(conn, p)); + conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p)); } } catch (Throwable t) { } @@ -163,7 +164,7 @@ public abstract class Client implements Resourcable } } - protected abstract ClientConnection createClientConnection(final int index, AsyncConnection channel); + protected abstract C createClientConnection(final int index, AsyncConnection channel); //创建连接后先从服务器拉取数据构建的虚拟请求,返回null表示连上服务器后不读取数据 protected R createVirtualRequestAfterConnect() { @@ -174,7 +175,7 @@ public abstract class Client implements Resourcable return 30; } - protected void handlePingResult(ClientConnection conn, P result) { + protected void handlePingResult(C conn, P result) { } public synchronized void close() { @@ -219,14 +220,14 @@ public abstract class Client implements Resourcable return conn.writeChannel(request); } - protected CompletableFuture connect() { + protected CompletableFuture connect() { return connect(null); } - protected CompletableFuture connect(final ChannelContext context) { + protected CompletableFuture connect(final ChannelContext context) { final boolean cflag = context != null && connectionContextName != null; if (cflag) { - ClientConnection cc = context.getAttribute(connectionContextName); + C cc = context.getAttribute(connectionContextName); if (cc != null && cc.isOpen()) { return CompletableFuture.completedFuture(cc); } @@ -240,7 +241,7 @@ public abstract class Client implements Resourcable connIndex = (int) Math.abs(Thread.currentThread().getId() % size); } // if (connIndex >= 0) { - ClientConnection cc = this.connArray[connIndex]; + C cc = (C) this.connArray[connIndex]; if (cc != null && cc.isOpen()) { if (cflag) { context.setAttribute(connectionContextName, cc); @@ -248,20 +249,20 @@ public abstract class Client implements Resourcable return CompletableFuture.completedFuture(cc); } final int index = connIndex; - final Queue> waitQueue = this.connAcquireWaitings[index]; + final Queue> waitQueue = this.connAcquireWaitings[index]; if (this.connOpenStates[index].compareAndSet(false, true)) { - CompletableFuture future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> createClientConnection(index, c).setMaxPipelines(maxPipelines)); + CompletableFuture future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) + .thenApply(c -> (C) createClientConnection(index, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); } else { - future = future.thenApply(conn -> conn.readChannel()); + future = future.thenApply(conn -> (C) conn.readChannel()); } return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> { c.setAuthenticated(true); this.connArray[index] = c; - CompletableFuture f; + CompletableFuture f; if (cflag) { context.setAttribute(connectionContextName, c); } @@ -308,7 +309,7 @@ public abstract class Client implements Resourcable // return waitClientConnection(); } - protected CompletableFuture waitClientConnection() { + protected CompletableFuture waitClientConnection() { CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS); connAcquireWaitings[connSeqno.getAndIncrement() % this.connLimit].offer(rs); return rs; diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index a8855aaa6..29630687c 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -29,7 +29,7 @@ public abstract class ClientConnection implements Co protected final int index; //从0开始, connArray的下坐标 - protected final Client client; + protected final Client client; protected final LongAdder respWaitingCounter; @@ -56,7 +56,7 @@ public abstract class ClientConnection implements Co private boolean authenticated; @SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"}) - public ClientConnection(Client client, int index, AsyncConnection channel) { + public ClientConnection(Client, R, P> client, int index, AsyncConnection channel) { this.client = client; this.codec = createCodec(); this.index = index;