diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 03286c2de..983d6fdd5 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -8,7 +8,6 @@ package org.redkale.net.client; import java.util.Objects; import java.util.concurrent.*; import org.redkale.annotation.Nonnull; -import org.redkale.annotation.Nullable; import org.redkale.net.*; import org.redkale.util.Traces; @@ -25,7 +24,7 @@ public class ClientFuture extends CompletableFuture< @Nonnull protected final R request; - @Nullable + @Nonnull protected final ClientConnection conn; private ScheduledFuture timeout; @@ -38,6 +37,7 @@ public class ClientFuture extends CompletableFuture< ClientFuture(ClientConnection conn, R request) { super(); + Objects.requireNonNull(conn); Objects.requireNonNull(request); this.conn = conn; this.request = request; @@ -79,7 +79,7 @@ public class ClientFuture extends CompletableFuture< private void runTimeout() { String traceid = request != null ? request.getTraceid() : null; - if (request != null && conn != null) { + if (request != null) { conn.removeRespFuture(request.getRequestid(), this); } TimeoutException ex = new TimeoutException("client-request: " + request); @@ -88,26 +88,17 @@ public class ClientFuture extends CompletableFuture< workThread = request.workThread; request.workThread = null; } - if (conn != null && (workThread == null || workThread.getWorkExecutor() == null)) { + if (workThread == null || workThread.getWorkExecutor() == null) { workThread = conn.getChannel().getReadIOThread(); } - if (workThread == null) { + workThread.runWork(() -> { Traces.currentTraceid(traceid); if (!isDone()) { completeExceptionally(ex); } - } else { - workThread.runWork(() -> { - Traces.currentTraceid(traceid); - if (!isDone()) { - completeExceptionally(ex); - } - Traces.removeTraceid(); - }); - } - if (conn != null) { - conn.dispose(ex); - } + Traces.removeTraceid(); + }); + conn.dispose(ex); } @Override diff --git a/src/main/java/org/redkale/net/client/ClientWriteThread.java b/src/main/java/org/redkale/net/client/ClientWriteThread.java new file mode 100644 index 000000000..8028b0941 --- /dev/null +++ b/src/main/java/org/redkale/net/client/ClientWriteThread.java @@ -0,0 +1,71 @@ +/* + +*/ + +package org.redkale.net.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 输出队列线程 + * + *

详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +public class ClientWriteThread extends Thread { + + protected final LinkedBlockingQueue writeQueue = new LinkedBlockingQueue(); + + protected ClientWriteThread() { + // do nothing + } + + public void offer(ClientFuture respFuture) { + this.writeQueue.add(Objects.requireNonNull(respFuture)); + } + + @Override + public void run() { + final List list = new ArrayList<>(); + while (true) { + try { + ClientFuture respFuture = this.writeQueue.take(); + if (respFuture == ClientFuture.NIL) { + return; + } + boolean over = false; + list.clear(); + list.add(respFuture); + ClientConnection conn = respFuture.conn; + int max = conn.getMaxPipelines(); + while (--max > 0 && (respFuture = this.writeQueue.poll()) != null) { + if (respFuture == ClientFuture.NIL) { + over = true; + break; + } else if (respFuture.conn == conn) { + list.add(respFuture); + } + } + conn.sendRequestToChannel(list.toArray(new ClientFuture[list.size()])); + list.clear(); + if (over) { + return; + } + } catch (InterruptedException ie) { + break; + } catch (Throwable e) { + // do nothing + } + } + } + + public void close() { + this.writeQueue.add(ClientFuture.NIL); + this.interrupt(); + } +}