废弃ClientWriteIOThread

This commit is contained in:
redkale
2023-03-27 18:55:27 +08:00
parent ffbad698b4
commit 067b88ab72
13 changed files with 171 additions and 87 deletions

View File

@@ -578,7 +578,7 @@ public final class Application {
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
this.clientAsyncGroup = new AsyncIOGroup(true, "Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); this.clientAsyncGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true);
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
this.excludelibs = excludelib0; this.excludelibs = excludelib0;
@@ -1209,7 +1209,7 @@ public final class Application {
if (!compileMode && source instanceof Service) { if (!compileMode && source instanceof Service) {
((Service) source).init(sourceConf); ((Service) source).init(sourceConf);
} }
logger.info("Load CacheSource resourceName = " + sourceName + ", source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms");
return source; return source;
} }
if (!sourceConf.getValue(AbstractCacheSource.CACHE_SOURCE_RESOURCE, "").isEmpty()) { if (!sourceConf.getValue(AbstractCacheSource.CACHE_SOURCE_RESOURCE, "").isEmpty()) {
@@ -1223,7 +1223,7 @@ public final class Application {
CacheSource source = AbstractCacheSource.createCacheSource(serverClassLoader, resourceFactory, sourceConf, sourceName, compileMode); CacheSource source = AbstractCacheSource.createCacheSource(serverClassLoader, resourceFactory, sourceConf, sourceName, compileMode);
cacheSources.add(source); cacheSources.add(source);
resourceFactory.register(sourceName, CacheSource.class, source); resourceFactory.register(sourceName, CacheSource.class, source);
logger.info("Load CacheSource resourceName = " + sourceName + ", source = " + source + " in " + (System.currentTimeMillis() - st) + " ms"); logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms");
return source; return source;
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
throw ex; throw ex;
@@ -1255,7 +1255,7 @@ public final class Application {
} }
dataSources.add(source); dataSources.add(source);
resourceFactory.register(sourceName, DataSource.class, source); resourceFactory.register(sourceName, DataSource.class, source);
logger.info("Load DataSource resourceName = " + sourceName + ", source = " + source); logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source);
return source; return source;
} }
if (!sourceConf.getValue(AbstractDataSource.DATA_SOURCE_RESOURCE, "").isEmpty()) { if (!sourceConf.getValue(AbstractDataSource.DATA_SOURCE_RESOURCE, "").isEmpty()) {
@@ -1277,7 +1277,7 @@ public final class Application {
} else { } else {
resourceFactory.register(sourceName, DataSource.class, source); resourceFactory.register(sourceName, DataSource.class, source);
} }
logger.info("Load DataSource resourceName = " + sourceName + ", source = " + source); logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source);
return source; return source;
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
throw ex; throw ex;

View File

@@ -170,7 +170,7 @@ public abstract class NodeServer {
//必须要进行初始化, 构建Service时需要使用Context中的ExecutorService //必须要进行初始化, 构建Service时需要使用Context中的ExecutorService
server.init(this.serverConf); server.init(this.serverConf);
if (this.sncpAddress != null) { //初始化SncpClient if (this.sncpAddress != null) { //初始化SncpClient
this.sncpAsyncGroup = new AsyncIOGroup(true, "Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true);
this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000);
} }

View File

@@ -273,6 +273,20 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
public final void readRegisterInIOThreadSafe(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrReadThread()) {
if (!readPending) {
readRegister(handler);
}
} else {
executeRead(() -> {
if (!readPending) {
readRegister(handler);
}
});
}
}
public final void read(CompletionHandler<Integer, ByteBuffer> handler) { public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
if (sslEngine == null) { if (sslEngine == null) {
readImpl(handler); readImpl(handler);
@@ -787,6 +801,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return writeBufferSupplier.get(); return writeBufferSupplier.get();
} }
public boolean isReadPending() {
return this.readPending;
}
public boolean isWritePending() {
return this.writePending;
}
public void dispose() {//同close 只是去掉throws IOException public void dispose() {//同close 只是去掉throws IOException
try { try {
this.close(); this.close();

View File

@@ -7,7 +7,7 @@ package org.redkale.net;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.redkale.util.*; import org.redkale.util.ByteBufferPool;
/** /**
* Client模式的AsyncConnection连接构造器 * Client模式的AsyncConnection连接构造器
@@ -24,35 +24,19 @@ public abstract class AsyncGroup {
public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350); public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350);
public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, safeBufferPool); return new AsyncIOGroup(threadNameFormat, workExecutor, safeBufferPool);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, safeBufferPool);
} }
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(true, threadNameFormat, threads, workExecutor, safeBufferPool); return new AsyncIOGroup(threadNameFormat, threads, workExecutor, safeBufferPool);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, threads, workExecutor, safeBufferPool);
} }
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) { public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {

View File

@@ -11,6 +11,7 @@ import java.nio.channels.*;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.Supplier;
import org.redkale.annotation.ResourceType; import org.redkale.annotation.ResourceType;
import org.redkale.net.client.*; import org.redkale.net.client.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -38,7 +39,11 @@ public class AsyncIOGroup extends AsyncGroup {
final AsyncIOThread[] ioWriteThreads; final AsyncIOThread[] ioWriteThreads;
final AsyncIOThread connectThread; private final AtomicBoolean connectThreadInited = new AtomicBoolean();
private final Supplier<AsyncIOThread> connectThreadSupplier;
private volatile AsyncIOThread connectThread;
final int bufferCapacity; final int bufferCapacity;
@@ -58,24 +63,24 @@ public class AsyncIOGroup extends AsyncGroup {
protected final ScheduledThreadPoolExecutor timeoutExecutor; protected final ScheduledThreadPoolExecutor timeoutExecutor;
public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) {
this(true, "Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize); this("Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize);
} }
public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize); this(threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize);
} }
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); this(threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity));
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { public AsyncIOGroup(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
this(clientMode, threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool); this(threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool);
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean clientMode, String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { public AsyncIOGroup(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
this.bufferCapacity = safeBufferPool.getBufferCapacity(); this.bufferCapacity = safeBufferPool.getBufferCapacity();
this.ioReadThreads = new AsyncIOThread[threads]; this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads];
@@ -89,24 +94,23 @@ public class AsyncIOGroup extends AsyncGroup {
try { try {
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1); String indexfix = WorkThread.formatIndex(threads, i + 1);
if (clientMode) { this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i];
this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool);
} else {
this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i];
}
}
if (clientMode) {
this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
} else {
this.connectThread = null;
} }
this.connectThreadSupplier = () -> createConnectIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
} catch (IOException e) { } catch (IOException e) {
throw new RedkaleException(e); throw new RedkaleException(e);
} }
} }
protected AsyncIOThread createConnectIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
try {
return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} catch (IOException e) {
return null;
}
}
protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} }
@@ -119,6 +123,14 @@ public class AsyncIOGroup extends AsyncGroup {
return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool); return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool);
} }
AsyncIOThread connectThread() {
if (connectThreadInited.compareAndSet(false, true)) {
this.connectThread = connectThreadSupplier.get();
this.connectThread.start();
}
return this.connectThread;
}
@Override @Override
public AsyncGroup start() { public AsyncGroup start() {
if (started) { if (started) {
@@ -133,9 +145,6 @@ public class AsyncIOGroup extends AsyncGroup {
this.ioWriteThreads[i].start(); this.ioWriteThreads[i].start();
} }
} }
if (connectThread != null) {
connectThread.start();
}
started = true; started = true;
return this; return this;
} }

View File

@@ -26,8 +26,6 @@ import org.redkale.util.ByteBufferWriter;
*/ */
abstract class AsyncNioConnection extends AsyncConnection { abstract class AsyncNioConnection extends AsyncConnection {
final AsyncIOThread connectThread;
protected SocketAddress remoteAddress; protected SocketAddress remoteAddress;
//-------------------------------- 连操作 -------------------------------------- //-------------------------------- 连操作 --------------------------------------
@@ -89,7 +87,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
this.connectThread = ioGroup.connectThread;
} }
@Override @Override

View File

@@ -247,7 +247,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
if (connected) { if (connected) {
handleConnect(null); handleConnect(null);
} else if (connectKey == null) { } else if (connectKey == null) {
connectThread.register(selector -> { ioGroup.connectThread().register(selector -> {
try { try {
connectKey = channel.register(selector, SelectionKey.OP_CONNECT); connectKey = channel.register(selector, SelectionKey.OP_CONNECT);
connectKey.attach(this); connectKey.attach(this);

View File

@@ -118,7 +118,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.acceptThread = new Thread() { this.acceptThread = new Thread() {

View File

@@ -111,7 +111,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, safeBufferPool); this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
this.ioGroup.start(); this.ioGroup.start();
udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ); udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ);
this.acceptThread = new Thread() { this.acceptThread = new Thread() {

View File

@@ -263,6 +263,14 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return conn.writeChannel(request, respTransfer); return conn.writeChannel(request, respTransfer);
} }
private C createConnection(int index, AsyncConnection channel) {
C conn = createClientConnection(index, channel);
if (!channel.isReadPending()) {
channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread因executeRead可能会异步
}
return conn;
}
protected CompletableFuture<C> connect() { protected CompletableFuture<C> connect() {
final int size = this.connArray.length; final int size = this.connArray.length;
final int connIndex = (int) Math.abs(connIndexSeq.getAndIncrement()) % size; final int connIndex = (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
@@ -273,12 +281,10 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex]; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
if (this.connOpenStates[connIndex].compareAndSet(false, true)) { if (this.connOpenStates[connIndex].compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createConnection(connIndex, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else {
future = future.thenApply(conn -> (C) conn.readRegisterChannel());
} }
if (authenticate != null) { if (authenticate != null) {
future = future.thenCompose(authenticate); future = future.thenCompose(authenticate);
@@ -318,12 +324,10 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings; final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (entry.connOpenState.compareAndSet(false, true)) { if (entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createConnection(-1, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else {
future = future.thenApply(conn -> (C) conn.readRegisterChannel());
} }
if (authenticate != null) { if (authenticate != null) {
future = future.thenCompose(authenticate); future = future.thenCompose(authenticate);

View File

@@ -15,7 +15,7 @@ import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
* 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程里运行 * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程ReadIOThread里运行
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
@@ -74,7 +74,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} else { } else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid()); ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
if (respFuture != null) { if (respFuture != null) {
responseComplete(respFuture, cr.message, cr.exc); responseComplete(false, respFuture, cr.message, cr.exc);
} }
respPool.accept(cr); respPool.accept(cr);
} }
@@ -96,12 +96,12 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} }
} }
private void responseComplete(ClientFuture<R, P> respFuture, P message, Throwable exc) { void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
if (respFuture != null) { if (respFuture != null) {
R request = respFuture.request; R request = respFuture.request;
WorkThread workThread = null; WorkThread workThread = null;
try { try {
if (request != null && !request.isCompleted()) { if (!halfCompleted && request != null && !request.isCompleted()) {
if (exc == null) { if (exc == null) {
connection.sendHalfWrite(request, exc); connection.sendHalfWrite(request, exc);
//request没有发送完respFuture需要再次接收 //request没有发送完respFuture需要再次接收

View File

@@ -7,12 +7,13 @@ package org.redkale.net.client;
import java.io.Serializable; import java.io.Serializable;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.*; import java.util.function.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.ByteArray;
/** /**
* 注意: 要确保AsyncConnection的读写过程都必须在channel.ioThread中运行 * 注意: 要确保AsyncConnection的读写过程都必须在channel.ioThread中运行
@@ -38,20 +39,22 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final LongAdder doneResponseCounter = new LongAdder(); protected final LongAdder doneResponseCounter = new LongAdder();
protected final ByteArray writeArray = new ByteArray();
final AtomicBoolean pauseWriting = new AtomicBoolean(); final AtomicBoolean pauseWriting = new AtomicBoolean();
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>(); final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
ClientFuture currHalfWriteFuture; //pauseWriting=true此字段才会有值; pauseWriting=false此字段值为null
private final Client.AddressConnEntry connEntry; private final Client.AddressConnEntry connEntry;
protected final AsyncConnection channel; protected final AsyncConnection channel;
private final ClientCodec<R, P> codec; private final ClientCodec<R, P> codec;
private final ClientWriteIOThread writeThread;
//respFutureQueue、respFutureMap二选一 SPSC队列模式 //respFutureQueue、respFutureMap二选一 SPSC队列模式
private final Queue<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedQueue<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); private final Deque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>();
//respFutureQueue、respFutureMap二选一, key: requestid SPSC模式 //respFutureQueue、respFutureMap二选一, key: requestid SPSC模式
private final Map<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>(); private final Map<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>();
@@ -70,7 +73,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress());
this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting;
this.channel = channel.beforeCloseListener(this); this.channel = channel.beforeCloseListener(this);
this.writeThread = (ClientWriteIOThread) channel.getWriteIOThread();
} }
protected abstract ClientCodec createCodec(); protected abstract ClientCodec createCodec();
@@ -87,18 +89,71 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (rts > 0 && !request.isCloseType()) { if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
} }
respWaitingCounter.increment(); //放在writeChannelUnsafe计数会延迟,导致不准确 respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确
writeThread.offerRequest(this, request, respFuture); if (channel.inCurrWriteThread()) {
writeChannelInThread(request, respFuture);
} else {
channel.executeWrite(() -> writeChannelInThread(request, respFuture));
}
return respFuture; return respFuture;
} }
private void writeChannelInThread(R request, ClientFuture respFuture) {
offerRespFuture(respFuture);
if (pauseWriting.get()) {
pauseRequests.add(respFuture);
} else {
sendRequestInThread(request, respFuture);
}
}
private void sendRequestInThread(R request, ClientFuture respFuture) {
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
channel.write(writeArray, this, writeHandler);
}
}
//发送半包和积压的请求数据包
private void sendHalfWriteInThread(R request, Throwable halfRequestExc) {
pauseWriting.set(false);
ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) {
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInThread(request, respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
}
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
sendRequestInThread((R) respFuture.getRequest(), respFuture);
}
}
void sendHalfWrite(R request, Throwable halfRequestExc) {
if (channel.inCurrWriteThread()) {
sendHalfWriteInThread(request, halfRequestExc);
} else {
channel.executeWrite(() -> sendHalfWriteInThread(request, halfRequestExc));
}
}
CompletableFuture<P> writeVirtualRequest(R request) { CompletableFuture<P> writeVirtualRequest(R request) {
if (!request.isVirtualType()) { if (!request.isVirtualType()) {
return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true")); return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true"));
} }
ClientFuture<R, P> respFuture = createClientFuture(request); ClientFuture<R, P> respFuture = createClientFuture(request);
respFutureQueue.offer(respFuture); offerRespFuture(respFuture);
readRegisterChannel();
return respFuture; return respFuture;
} }
@@ -109,11 +164,6 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return new ClientFuture(this, request); return new ClientFuture(this, request);
} }
protected ClientConnection readRegisterChannel() {
channel.readRegisterInIOThread(codec);
return this;
}
@Override //AsyncConnection.beforeCloseListener @Override //AsyncConnection.beforeCloseListener
public void accept(AsyncConnection t) { public void accept(AsyncConnection t) {
respWaitingCounter.reset(); respWaitingCounter.reset();
@@ -145,8 +195,14 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
} }
void sendHalfWrite(R request, Throwable halfRequestExc) { //只会在WriteIOThread中调用
writeThread.sendHalfWrite(this, request, halfRequestExc); void offerFirstRespFuture(ClientFuture<R, P> respFuture) {
Serializable requestid = respFuture.request.getRequestid();
if (requestid == null) {
respFutureQueue.offerFirst(respFuture);
} else {
respFutureMap.put(requestid, respFuture);
}
} }
//只会在WriteIOThread中调用 //只会在WriteIOThread中调用
@@ -264,4 +320,16 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
for (int i = 0; i < cha; i++) s += ' '; for (int i = 0; i < cha; i++) s += ' ';
return s; return s;
} }
protected final CompletionHandler<Integer, ClientConnection> writeHandler = new CompletionHandler<Integer, ClientConnection>() {
@Override
public void completed(Integer result, ClientConnection attachment) {
}
@Override
public void failed(Throwable exc, ClientConnection attachment) {
attachment.dispose(exc);
}
};
} }

View File

@@ -6,7 +6,7 @@ package org.redkale.net.http;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.ByteBufferPool;
/** /**
* WebSocket只写版的AsyncIOGroup <br> * WebSocket只写版的AsyncIOGroup <br>
@@ -23,7 +23,7 @@ import org.redkale.util.*;
class WebSocketAsyncGroup extends AsyncIOGroup { class WebSocketAsyncGroup extends AsyncIOGroup {
public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
super(false, threadNameFormat, workExecutor, safeBufferPool); super(threadNameFormat, workExecutor, safeBufferPool);
} }
@Override @Override