From 067b88ab726ddf16e439e495f1b59d886aa09513 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 27 Mar 2023 18:55:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BA=9F=E5=BC=83ClientWriteIOThread?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 10 +- .../java/org/redkale/boot/NodeServer.java | 2 +- .../java/org/redkale/net/AsyncConnection.java | 22 ++++ src/main/java/org/redkale/net/AsyncGroup.java | 30 ++---- .../java/org/redkale/net/AsyncIOGroup.java | 57 +++++----- .../org/redkale/net/AsyncNioConnection.java | 3 - .../redkale/net/AsyncNioTcpConnection.java | 2 +- .../net/AsyncNioTcpProtocolServer.java | 2 +- .../net/AsyncNioUdpProtocolServer.java | 2 +- .../java/org/redkale/net/client/Client.java | 16 +-- .../org/redkale/net/client/ClientCodec.java | 8 +- .../redkale/net/client/ClientConnection.java | 100 +++++++++++++++--- .../redkale/net/http/WebSocketAsyncGroup.java | 4 +- 13 files changed, 171 insertions(+), 87 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 89e14b836..b5ddc1566 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -578,7 +578,7 @@ public final class Application { this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); - this.clientAsyncGroup = new AsyncIOGroup(true, "Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); + this.clientAsyncGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.excludelibs = excludelib0; @@ -1209,7 +1209,7 @@ public final class Application { if (!compileMode && source instanceof Service) { ((Service) source).init(sourceConf); } - logger.info("Load CacheSource resourceName = " + sourceName + ", source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); + logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); return source; } if (!sourceConf.getValue(AbstractCacheSource.CACHE_SOURCE_RESOURCE, "").isEmpty()) { @@ -1223,7 +1223,7 @@ public final class Application { CacheSource source = AbstractCacheSource.createCacheSource(serverClassLoader, resourceFactory, sourceConf, sourceName, compileMode); cacheSources.add(source); resourceFactory.register(sourceName, CacheSource.class, source); - logger.info("Load CacheSource resourceName = " + sourceName + ", source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); + logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); return source; } catch (RuntimeException ex) { throw ex; @@ -1255,7 +1255,7 @@ public final class Application { } dataSources.add(source); resourceFactory.register(sourceName, DataSource.class, source); - logger.info("Load DataSource resourceName = " + sourceName + ", source = " + source); + logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source); return source; } if (!sourceConf.getValue(AbstractDataSource.DATA_SOURCE_RESOURCE, "").isEmpty()) { @@ -1277,7 +1277,7 @@ public final class Application { } else { resourceFactory.register(sourceName, DataSource.class, source); } - logger.info("Load DataSource resourceName = " + sourceName + ", source = " + source); + logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source); return source; } catch (RuntimeException ex) { throw ex; diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index a81751d83..29bb6843b 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -170,7 +170,7 @@ public abstract class NodeServer { //必须要进行初始化, 构建Service时需要使用Context中的ExecutorService server.init(this.serverConf); if (this.sncpAddress != null) { //初始化SncpClient - this.sncpAsyncGroup = new AsyncIOGroup(true, "Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); + this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); } diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index a8b3163da..03ebcb897 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -273,6 +273,20 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void readRegisterInIOThreadSafe(CompletionHandler handler) { + if (inCurrReadThread()) { + if (!readPending) { + readRegister(handler); + } + } else { + executeRead(() -> { + if (!readPending) { + readRegister(handler); + } + }); + } + } + public final void read(CompletionHandler handler) { if (sslEngine == null) { readImpl(handler); @@ -787,6 +801,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return writeBufferSupplier.get(); } + public boolean isReadPending() { + return this.readPending; + } + + public boolean isWritePending() { + return this.writePending; + } + public void dispose() {//同close, 只是去掉throws IOException try { this.close(); diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 9f3d1daa0..17b858f68 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -7,7 +7,7 @@ package org.redkale.net; import java.net.SocketAddress; import java.util.concurrent.*; -import org.redkale.util.*; +import org.redkale.util.ByteBufferPool; /** * Client模式的AsyncConnection连接构造器 @@ -24,35 +24,19 @@ public abstract class AsyncGroup { public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350); public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); + return new AsyncIOGroup(threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { - return new AsyncIOGroup(true, threadNameFormat, workExecutor, safeBufferPool); - } - - public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); - } - - public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { - return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, safeBufferPool); + public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(threadNameFormat, workExecutor, safeBufferPool); } public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); + return new AsyncIOGroup(threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { - return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, safeBufferPool); - } - - public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); - } - - public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { - return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, safeBufferPool); + public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + return new AsyncIOGroup(threadNameFormat, threads, workExecutor, safeBufferPool); } public CompletableFuture createTCPClient(final SocketAddress address) { diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index acf11eff6..c5b52d2d0 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -11,6 +11,7 @@ import java.nio.channels.*; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.Supplier; import org.redkale.annotation.ResourceType; import org.redkale.net.client.*; import org.redkale.util.*; @@ -38,7 +39,11 @@ public class AsyncIOGroup extends AsyncGroup { final AsyncIOThread[] ioWriteThreads; - final AsyncIOThread connectThread; + private final AtomicBoolean connectThreadInited = new AtomicBoolean(); + + private final Supplier connectThreadSupplier; + + private volatile AsyncIOThread connectThread; final int bufferCapacity; @@ -58,24 +63,24 @@ public class AsyncIOGroup extends AsyncGroup { protected final ScheduledThreadPoolExecutor timeoutExecutor; public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { - this(true, "Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize); + this("Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize); } - public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize); + public AsyncIOGroup(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + this(threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize); } - public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(clientMode, threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); + public AsyncIOGroup(String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + this(threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); } @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { - this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool); + public AsyncIOGroup(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + this(threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool); } @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + public AsyncIOGroup(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { this.bufferCapacity = safeBufferPool.getBufferCapacity(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; @@ -89,24 +94,23 @@ public class AsyncIOGroup extends AsyncGroup { try { for (int i = 0; i < threads; i++) { String indexfix = WorkThread.formatIndex(threads, i + 1); - if (clientMode) { - this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); - this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool); - } else { - this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); - this.ioWriteThreads[i] = this.ioReadThreads[i]; - } - } - if (clientMode) { - this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); - } else { - this.connectThread = null; + this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); + this.ioWriteThreads[i] = this.ioReadThreads[i]; } + this.connectThreadSupplier = () -> createConnectIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); } catch (IOException e) { throw new RedkaleException(e); } } + protected AsyncIOThread createConnectIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { + try { + return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); + } catch (IOException e) { + return null; + } + } + protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); } @@ -119,6 +123,14 @@ public class AsyncIOGroup extends AsyncGroup { return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool); } + AsyncIOThread connectThread() { + if (connectThreadInited.compareAndSet(false, true)) { + this.connectThread = connectThreadSupplier.get(); + this.connectThread.start(); + } + return this.connectThread; + } + @Override public AsyncGroup start() { if (started) { @@ -133,9 +145,6 @@ public class AsyncIOGroup extends AsyncGroup { this.ioWriteThreads[i].start(); } } - if (connectThread != null) { - connectThread.start(); - } started = true; return this; } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 10903f1d1..c33ca6207 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -26,8 +26,6 @@ import org.redkale.util.ByteBufferWriter; */ abstract class AsyncNioConnection extends AsyncConnection { - final AsyncIOThread connectThread; - protected SocketAddress remoteAddress; //-------------------------------- 连操作 -------------------------------------- @@ -89,7 +87,6 @@ abstract class AsyncNioConnection extends AsyncConnection { public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); - this.connectThread = ioGroup.connectThread; } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 3f669c881..88c41bfad 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -247,7 +247,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { if (connected) { handleConnect(null); } else if (connectKey == null) { - connectThread.register(selector -> { + ioGroup.connectThread().register(selector -> { try { connectKey = channel.register(selector, SelectionKey.OP_CONNECT); connectKey.attach(this); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index e1b8f5519..f57ccfec3 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -118,7 +118,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { (pool == null ? safeResponsePool : pool).accept(v); }; final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); - this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); + this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); this.ioGroup.start(); this.acceptThread = new Thread() { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index b13aaa6f7..bc876caf6 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -111,7 +111,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { (pool == null ? safeResponsePool : pool).accept(v); }; final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); - this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); + this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); this.ioGroup.start(); udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); this.acceptThread = new Thread() { diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 086800b2e..087765032 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -263,6 +263,14 @@ public abstract class Client, R extends ClientR return conn.writeChannel(request, respTransfer); } + private C createConnection(int index, AsyncConnection channel) { + C conn = createClientConnection(index, channel); + if (!channel.isReadPending()) { + channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 + } + return conn; + } + protected CompletableFuture connect() { final int size = this.connArray.length; final int connIndex = (int) Math.abs(connIndexSeq.getAndIncrement()) % size; @@ -273,12 +281,10 @@ public abstract class Client, R extends ClientR final Queue> waitQueue = this.connAcquireWaitings[connIndex]; if (this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> (C) createConnection(connIndex, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); - } else { - future = future.thenApply(conn -> (C) conn.readRegisterChannel()); } if (authenticate != null) { future = future.thenCompose(authenticate); @@ -318,12 +324,10 @@ public abstract class Client, R extends ClientR final Queue> waitQueue = entry.connAcquireWaitings; if (entry.connOpenState.compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> (C) createConnection(-1, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); - } else { - future = future.thenApply(conn -> (C) conn.readRegisterChannel()); } if (authenticate != null) { future = future.thenCompose(authenticate); diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 1ad6db9e6..ae3cc6491 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -15,7 +15,7 @@ import org.redkale.net.*; import org.redkale.util.*; /** - * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程里运行 + * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行 * *

* 详情见: https://redkale.org @@ -74,7 +74,7 @@ public abstract class ClientCodec implements Complet } else { ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); if (respFuture != null) { - responseComplete(respFuture, cr.message, cr.exc); + responseComplete(false, respFuture, cr.message, cr.exc); } respPool.accept(cr); } @@ -96,12 +96,12 @@ public abstract class ClientCodec implements Complet } } - private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { + void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { if (respFuture != null) { R request = respFuture.request; WorkThread workThread = null; try { - if (request != null && !request.isCompleted()) { + if (!halfCompleted && request != null && !request.isCompleted()) { if (exc == null) { connection.sendHalfWrite(request, exc); //request没有发送完,respFuture需要再次接收 diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 723f8adab..801bb6972 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -7,12 +7,13 @@ package org.redkale.net.client; import java.io.Serializable; import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; +import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; import org.redkale.net.*; +import org.redkale.util.ByteArray; /** * 注意: 要确保AsyncConnection的读写过程都必须在channel.ioThread中运行 @@ -38,20 +39,22 @@ public abstract class ClientConnection implements Co protected final LongAdder doneResponseCounter = new LongAdder(); + protected final ByteArray writeArray = new ByteArray(); + final AtomicBoolean pauseWriting = new AtomicBoolean(); final ConcurrentLinkedQueue pauseRequests = new ConcurrentLinkedQueue<>(); + ClientFuture currHalfWriteFuture; //pauseWriting=true,此字段才会有值; pauseWriting=false,此字段值为null + private final Client.AddressConnEntry connEntry; protected final AsyncConnection channel; private final ClientCodec codec; - private final ClientWriteIOThread writeThread; - //respFutureQueue、respFutureMap二选一, SPSC队列模式 - private final Queue> respFutureQueue = new ConcurrentLinkedQueue<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); + private final Deque> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); //respFutureQueue、respFutureMap二选一, key: requestid, SPSC模式 private final Map> respFutureMap = new ConcurrentHashMap<>(); @@ -70,7 +73,6 @@ public abstract class ClientConnection implements Co this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.channel = channel.beforeCloseListener(this); - this.writeThread = (ClientWriteIOThread) channel.getWriteIOThread(); } protected abstract ClientCodec createCodec(); @@ -87,18 +89,71 @@ public abstract class ClientConnection implements Co if (rts > 0 && !request.isCloseType()) { respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); } - respWaitingCounter.increment(); //放在writeChannelUnsafe计数会延迟,导致不准确 - writeThread.offerRequest(this, request, respFuture); + respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确 + if (channel.inCurrWriteThread()) { + writeChannelInThread(request, respFuture); + } else { + channel.executeWrite(() -> writeChannelInThread(request, respFuture)); + } return respFuture; } + private void writeChannelInThread(R request, ClientFuture respFuture) { + offerRespFuture(respFuture); + if (pauseWriting.get()) { + pauseRequests.add(respFuture); + } else { + sendRequestInThread(request, respFuture); + } + } + + private void sendRequestInThread(R request, ClientFuture respFuture) { + //发送请求数据包 + writeArray.clear(); + request.writeTo(this, writeArray); + if (request.isCompleted()) { + doneRequestCounter.increment(); + } else { //还剩半包没发送完 + pauseWriting.set(true); + currHalfWriteFuture = respFuture; + } + if (writeArray.length() > 0) { + channel.write(writeArray, this, writeHandler); + } + } + + //发送半包和积压的请求数据包 + private void sendHalfWriteInThread(R request, Throwable halfRequestExc) { + pauseWriting.set(false); + ClientFuture respFuture = this.currHalfWriteFuture; + if (respFuture != null) { + this.currHalfWriteFuture = null; + if (halfRequestExc == null) { + offerFirstRespFuture(respFuture); + sendRequestInThread(request, respFuture); + } else { + codec.responseComplete(true, respFuture, null, halfRequestExc); + } + } + while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { + sendRequestInThread((R) respFuture.getRequest(), respFuture); + } + } + + void sendHalfWrite(R request, Throwable halfRequestExc) { + if (channel.inCurrWriteThread()) { + sendHalfWriteInThread(request, halfRequestExc); + } else { + channel.executeWrite(() -> sendHalfWriteInThread(request, halfRequestExc)); + } + } + CompletableFuture

writeVirtualRequest(R request) { if (!request.isVirtualType()) { return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true")); } ClientFuture respFuture = createClientFuture(request); - respFutureQueue.offer(respFuture); - readRegisterChannel(); + offerRespFuture(respFuture); return respFuture; } @@ -109,11 +164,6 @@ public abstract class ClientConnection implements Co return new ClientFuture(this, request); } - protected ClientConnection readRegisterChannel() { - channel.readRegisterInIOThread(codec); - return this; - } - @Override //AsyncConnection.beforeCloseListener public void accept(AsyncConnection t) { respWaitingCounter.reset(); @@ -145,8 +195,14 @@ public abstract class ClientConnection implements Co } } - void sendHalfWrite(R request, Throwable halfRequestExc) { - writeThread.sendHalfWrite(this, request, halfRequestExc); + //只会在WriteIOThread中调用 + void offerFirstRespFuture(ClientFuture respFuture) { + Serializable requestid = respFuture.request.getRequestid(); + if (requestid == null) { + respFutureQueue.offerFirst(respFuture); + } else { + respFutureMap.put(requestid, respFuture); + } } //只会在WriteIOThread中调用 @@ -264,4 +320,16 @@ public abstract class ClientConnection implements Co for (int i = 0; i < cha; i++) s += ' '; return s; } + + protected final CompletionHandler writeHandler = new CompletionHandler() { + + @Override + public void completed(Integer result, ClientConnection attachment) { + } + + @Override + public void failed(Throwable exc, ClientConnection attachment) { + attachment.dispose(exc); + } + }; } diff --git a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java index a42a14e4c..d61952b71 100644 --- a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java +++ b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java @@ -6,7 +6,7 @@ package org.redkale.net.http; import java.io.IOException; import java.util.concurrent.ExecutorService; import org.redkale.net.*; -import org.redkale.util.*; +import org.redkale.util.ByteBufferPool; /** * WebSocket只写版的AsyncIOGroup
@@ -23,7 +23,7 @@ import org.redkale.util.*; class WebSocketAsyncGroup extends AsyncIOGroup { public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { - super(false, threadNameFormat, workExecutor, safeBufferPool); + super(threadNameFormat, workExecutor, safeBufferPool); } @Override