client优化

This commit is contained in:
redkale
2023-03-28 15:04:43 +08:00
parent d7655aa2be
commit 49662a7d5f
6 changed files with 46 additions and 52 deletions

View File

@@ -31,14 +31,6 @@ public abstract class AsyncGroup {
return new AsyncIOGroup(threadNameFormat, workExecutor, safeBufferPool);
}
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
return new AsyncIOGroup(threadNameFormat, threads, workExecutor, safeBufferPool);
}
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {
return createTCPClient(address, 0, 0);
}

View File

@@ -57,41 +57,33 @@ public class AsyncIOGroup extends AsyncGroup {
//关闭数
protected final LongAdder connClosedCounter = new LongAdder();
protected final ScheduledThreadPoolExecutor timeoutExecutor;
//超时器
protected final ScheduledExecutorService timeoutExecutor;
public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) {
this("Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize);
this("Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize);
}
public AsyncIOGroup(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize);
}
public AsyncIOGroup(String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity));
this(threadNameFormat, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity));
}
@SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
this(threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool);
}
@SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) {
final int threads = Utility.cpus(); //固定值,不可改
this.bufferCapacity = safeBufferPool.getBufferCapacity();
this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads];
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
this.timeoutExecutor = Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r, String.format(threadNameFormat, "Timeout"));
t.setDaemon(true);
return t;
});
try {
for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1);
this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
String indexFix = WorkThread.formatIndex(threads, i + 1);
this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexFix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i];
}
this.connectThread = createConnectIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);

View File

@@ -30,6 +30,8 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public static final int DEFAULT_MAX_PIPELINES = 128;
protected boolean debug;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final String name;

View File

@@ -99,39 +99,37 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
if (respFuture != null) {
R request = respFuture.request;
WorkThread workThread = null;
AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request == null ? readThread : request.removeWorkThread(readThread);
try {
if (!halfCompleted && request != null && !request.isCompleted()) {
if (exc == null) {
connection.sendHalfWrite(request, exc);
//request没有发送完respFuture需要再次接收
return;
} else { //异常了需要清掉半包
} else {
connection.sendHalfWrite(request, exc);
//异常了需要清掉半包
}
}
if (request != null) {
workThread = request.workThread;
request.workThread = null;
}
connection.respWaitingCounter.decrement();
if (connection.isAuthenticated()) {
connection.client.incrRespDoneCounter();
}
respFuture.cancelTimeout();
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
// if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, exc);
// }
connection.preComplete(message, (R) request, exc);
boolean reqInIO = workThread != null && workThread.inIO();
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = connection.channel.getReadIOThread();
}
if (exc != null) {
if (reqInIO) { //request在IO线程中发送请求说明request是在异步模式中
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(exc);
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(exc);
});
} else {
workThread.runWork(() -> {
if (request != null) {
@@ -142,11 +140,13 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
}
} else {
final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message);
if (reqInIO) { //request在IO线程中发送请求说明request是在异步模式中
if (request != null) {
Traces.currTraceid(request.traceid);
}
((ClientFuture) respFuture).complete(rs);
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
((ClientFuture) respFuture).complete(rs);
});
} else {
workThread.runWork(() -> {
if (request != null) {
@@ -157,11 +157,13 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
}
}
} catch (Throwable t) {
if (workThread == null || workThread.inIO()) {
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(t);
if (workThread.inIO()) {
workThread.execute(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(t);
});
} else {
workThread.runWork(() -> {
if (request != null) {

View File

@@ -32,6 +32,12 @@ public abstract class ClientRequest {
public abstract void writeTo(ClientConnection conn, ByteArray array);
WorkThread removeWorkThread(WorkThread defaultValue) {
WorkThread t = this.workThread;
this.workThread = null;
return t == null ? defaultValue : t;
}
public Serializable getRequestid() {
return null;
}

View File

@@ -24,11 +24,11 @@ import org.redkale.util.*;
@Deprecated(since = "2.8.0")
public class WebSocketWriteIOThread extends AsyncIOThread {
private final ScheduledThreadPoolExecutor timeoutExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final BlockingDeque<WebSocketFuture> requestQueue = new LinkedBlockingDeque<>();
public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads,
public WebSocketWriteIOThread(ScheduledExecutorService timeoutExecutor, ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, safeBufferPool);
Objects.requireNonNull(timeoutExecutor);