From abe73c4022a231bc88902b1fc908805e4994ca17 Mon Sep 17 00:00:00 2001 From: Redkale Date: Wed, 4 Jan 2023 10:21:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Client=E7=9A=84pingRequest?= =?UTF-8?q?=E5=92=8CcloseRequest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/client/Client.java | 22 +++--- .../redkale/net/client/ClientConnection.java | 6 +- .../org/redkale/net/client/ClientRequest.java | 5 ++ .../net/client/ClientWriteIOThread.java | 70 +++++++++++++++++++ 4 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/redkale/net/client/ClientWriteIOThread.java diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index dd57f099a..b6e7943a3 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -8,7 +8,7 @@ package org.redkale.net.client; import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.function.Function; +import java.util.function.*; import java.util.logging.Logger; import org.redkale.net.*; import org.redkale.util.*; @@ -68,10 +68,10 @@ public abstract class Client { //------------------ 可选项 ------------------ //PING心跳的请求数据,为null且pingInterval<1表示不需要定时ping - protected R pingRequest; + protected Supplier pingRequestSupplier; //关闭请求的数据, 为null表示直接关闭 - protected R closeRequest; + protected Supplier closeRequestSupplier; //创建连接后进行的登录鉴权操作 protected Function, CompletableFuture> authenticate; @@ -98,13 +98,13 @@ public abstract class Client { } protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, - R closeRequest, Function, CompletableFuture> authenticate) { - this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequest, authenticate); + Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { + this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); } @SuppressWarnings("OverridableMethodCallInConstructor") protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, - int maxPipelines, R pingRequest, R closeRequest, Function, CompletableFuture> authenticate) { + int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { if (maxPipelines < 1) { throw new IllegalArgumentException("maxPipelines must bigger 0"); } @@ -113,8 +113,8 @@ public abstract class Client { this.address = address; this.connLimit = maxconns; this.maxPipelines = maxPipelines; - this.pingRequest = pingRequest; - this.closeRequest = closeRequest; + this.pingRequestSupplier = pingRequestSupplier; + this.closeRequestSupplier = closeRequestSupplier; this.authenticate = authenticate; this.connArray = new ClientConnection[connLimit]; this.connOpenStates = new AtomicBoolean[connLimit]; @@ -131,10 +131,10 @@ public abstract class Client { t.setDaemon(true); return t; }); - if (pingRequest != null && this.timeoutFuture == null) { + if (pingRequestSupplier != null && this.timeoutFuture == null) { this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> { try { - R req = pingRequest; + R req = pingRequestSupplier.get(); if (req == null) { //可能运行中进行重新赋值 timeoutFuture.cancel(true); timeoutFuture = null; @@ -170,7 +170,7 @@ public abstract class Client { return; } this.timeoutScheduler.shutdownNow(); - final R closereq = closeRequest; + final R closereq = closeRequestSupplier == null ? null : closeRequestSupplier.get(); for (ClientConnection conn : this.connArray) { if (conn == null) { continue; diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index a4457d31b..213d304b5 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -61,7 +61,7 @@ public abstract class ClientConnection implements Co @Override public void completed(Integer result, Void attachment) { - if (writeLastRequest != null && writeLastRequest == client.closeRequest) { + if (writeLastRequest != null && writeLastRequest.isCloseType()) { if (closeFuture != null) { channel.getWriteIOThread().runWork(() -> { closeFuture.complete(null); @@ -350,7 +350,7 @@ public abstract class ClientConnection implements Co protected final CompletableFuture

writeChannel(R request) { ClientFuture respFuture; - if (request == client.closeRequest) { + if (request.isCloseType()) { respFuture = createClientFuture(null); closeFuture = respFuture; } else { @@ -373,7 +373,7 @@ public abstract class ClientConnection implements Co private void writeChannelInThread(R request, ClientFuture respFuture) { Serializable reqid = request.getRequestid(); //保证顺序一致 - if (client.closeRequest != null && respFuture.request == client.closeRequest) { + if (respFuture.request.isCloseType()) { responseQueue.offer(ClientFuture.EMPTY); } else { request.respFuture = respFuture; diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 34be758c6..389d7287e 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -33,6 +33,11 @@ public abstract class ClientRequest implements BiConsumer requestQueue = new LinkedBlockingQueue<>(); + + public ClientWriteIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, + ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { + super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + } + + public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) { + requestQueue.offer(new ClientEntity(conn, request, respFuture)); + } + + @Override + public void run() { + while (!this.closed) { + ClientEntity entity; + try { + while ((entity = requestQueue.take()) != null) { + ClientConnection conn = entity.conn; + ClientRequest request = entity.request; + ClientFuture respFuture = entity.respFuture; + Serializable reqid = request.getRequestid(); + if (reqid == null) { + conn.responseQueue.offer(respFuture); + } else { + conn.responseMap.put(reqid, respFuture); + } + ByteArray rw = conn.writeArray; + rw.clear(); + request.accept(conn, rw); + conn.channel.write(rw, conn.writeHandler); + } + } catch (InterruptedException e) { + } + } + } + + protected static class ClientEntity { + + ClientConnection conn; + + ClientRequest request; + + ClientFuture respFuture; + + public ClientEntity(ClientConnection conn, ClientRequest request, ClientFuture respFuture) { + this.conn = conn; + this.request = request; + this.respFuture = respFuture; + } + + } +}