优化Client

This commit is contained in:
Redkale
2022-12-30 00:07:50 +08:00
parent 2c861a5ed4
commit 76a85a6e3d
7 changed files with 85 additions and 69 deletions

View File

@@ -587,35 +587,38 @@ public final class Application {
executorConf = DefaultAnyValue.create(); executorConf = DefaultAnyValue.create();
} }
final AtomicReference<ExecutorService> workref = new AtomicReference<>(); final AtomicReference<ExecutorService> workref = new AtomicReference<>();
final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); final int workThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus()));
boolean executorHash = executorConf.getBoolValue("hash"); boolean workHash = executorConf.getBoolValue("hash", false);
if (executorThreads > 0) { if (workThreads > 0) {
final AtomicInteger workCounter = new AtomicInteger(); final AtomicInteger workCounter = new AtomicInteger();
if (executorHash) { if (workHash) {
workExecutor0 = new ThreadHashExecutor(executorThreads, (Runnable r) -> { workExecutor0 = new ThreadHashExecutor(workThreads, (Runnable r) -> {
int i = workCounter.get(); int i = workCounter.get();
int c = workCounter.incrementAndGet(); int c = workCounter.incrementAndGet();
String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c)); String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r);
return t; return t;
}); });
} else { } else {
workExecutor0 = Executors.newFixedThreadPool(executorThreads, (Runnable r) -> { workExecutor0 = Executors.newFixedThreadPool(workThreads, (Runnable r) -> {
int i = workCounter.get(); int i = workCounter.get();
int c = workCounter.incrementAndGet(); int c = workCounter.incrementAndGet();
String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c)); String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r);
return t; return t;
}); });
} }
workref.set(workExecutor0); workref.set(workExecutor0);
} }
//给所有client给一个默认的AsyncGroup
final AtomicInteger wclientCounter = new AtomicInteger(); final AtomicInteger wclientCounter = new AtomicInteger();
clientExecutor = Executors.newFixedThreadPool(Math.max(2, executorThreads / 2), (Runnable r) -> { final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2);
clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> {
int i = wclientCounter.get(); int i = wclientCounter.get();
int c = wclientCounter.incrementAndGet(); int c = wclientCounter.incrementAndGet();
String threadname = "Redkale-ClientThread-" + (c > 9 ? c : ("0" + c)); String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); Thread t = new WorkThread(threadname, i, clientThreads, workref.get(), r);
return t; return t;
}); });
} }

View File

@@ -44,7 +44,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
protected final AsyncGroup ioGroup; protected final AsyncGroup ioGroup;
protected final AsyncThread ioThread; protected final AsyncIOThread ioThread;
protected final boolean client; protected final boolean client;
@@ -76,12 +76,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
//用于服务端的Socket, 等同于一直存在的readCompletionHandler //用于服务端的Socket, 等同于一直存在的readCompletionHandler
ProtocolCodec protocolCodec; ProtocolCodec protocolCodec;
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncThread ioThread, final int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, ObjectPool<ByteBuffer> bufferPool,
SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
this(client, ioGroup, ioThread, bufferCapacity, bufferPool, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter); this(client, ioGroup, ioThread, bufferCapacity, bufferPool, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter);
} }
protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncThread ioThread, final int bufferCapacity, Supplier<ByteBuffer> bufferSupplier, protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier<ByteBuffer> bufferSupplier,
Consumer<ByteBuffer> bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { Consumer<ByteBuffer> bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) {
Objects.requireNonNull(bufferSupplier); Objects.requireNonNull(bufferSupplier);
Objects.requireNonNull(bufferConsumer); Objects.requireNonNull(bufferConsumer);
@@ -152,7 +152,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
return ioThread.inCurrThread(); return ioThread.inCurrThread();
} }
public final AsyncThread getAsyncThread() { public final AsyncIOThread getAsyncIOThread() {
return ioThread; return ioThread;
} }

View File

@@ -76,14 +76,15 @@ public class AsyncIOGroup extends AsyncGroup {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
this.ioThreads[i] = client ? new ClientIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
this.ioThreads[i] = new AsyncIOThread(true, name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); : new AsyncIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} }
if (client) { if (client) {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "ConnectThread"); String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "IOConnectThread");
this.connectThread = new AsyncIOThread(false, name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); this.connectThread = client ? new ClientIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
: new AsyncIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -196,9 +197,7 @@ public class AsyncIOGroup extends AsyncGroup {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} catch (IOException e) { } catch (IOException e) {
CompletableFuture future = new CompletableFuture(); return CompletableFuture.failedFuture(e);
future.completeExceptionally(e);
return future;
} }
AsyncIOThread ioThread = null; AsyncIOThread ioThread = null;
Thread currThread = Thread.currentThread(); Thread currThread = Thread.currentThread();

View File

@@ -24,7 +24,7 @@ import org.redkale.util.*;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class AsyncIOThread extends AsyncThread { public class AsyncIOThread extends WorkThread {
protected static final Logger logger = Logger.getLogger(AsyncIOThread.class.getSimpleName()); protected static final Logger logger = Logger.getLogger(AsyncIOThread.class.getSimpleName());
@@ -42,9 +42,7 @@ public class AsyncIOThread extends AsyncThread {
private boolean closed; private boolean closed;
int invoker = 0; public AsyncIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector,
public AsyncIOThread(final boolean readable, String name, int index, int threads, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(name, index, threads, workExecutor, null); super(name, index, threads, workExecutor, null);
this.selector = selector; this.selector = selector;
@@ -53,6 +51,21 @@ public class AsyncIOThread extends AsyncThread {
this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v); this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v);
} }
public static AsyncIOThread currAsyncIOThread() {
Thread t = Thread.currentThread();
return t instanceof AsyncIOThread ? (AsyncIOThread) t : null;
}
/**
* 是否IO线程
*
* @return boolean
*/
@Override
public final boolean inIO() {
return true;
}
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
commandQueue.offer(command); commandQueue.offer(command);
@@ -131,7 +144,6 @@ public class AsyncIOThread extends AsyncThread {
while (it.hasNext()) { while (it.hasNext()) {
SelectionKey key = it.next(); SelectionKey key = it.next();
it.remove(); it.remove();
invoker = 0;
if (!key.isValid()) { if (!key.isValid()) {
continue; continue;
} }

View File

@@ -1,40 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.util.concurrent.ExecutorService;
/**
* 协议处理的IO线程类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.5.0
*/
public abstract class AsyncThread extends WorkThread {
public AsyncThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) {
super(name, index, threads, workExecutor, target);
}
public static AsyncThread currAsyncThread() {
Thread t = Thread.currentThread();
return t instanceof AsyncThread ? (AsyncThread) t : null;
}
/**
* 是否IO线程
*
* @return boolean
*/
@Override
public final boolean inIO() {
return true;
}
}

View File

@@ -0,0 +1,32 @@
/*
*
*/
package org.redkale.net;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.util.concurrent.ExecutorService;
import org.redkale.util.ObjectPool;
/**
* 客户端版的IO线程类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class ClientIOThread extends AsyncIOThread {
public ClientIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool);
}
@Override
public final boolean inClient() {
return true;
}
}

View File

@@ -23,9 +23,9 @@ public class WorkThread extends Thread implements Executor {
protected final ThreadHashExecutor hashExecutor; protected final ThreadHashExecutor hashExecutor;
private int index; //WorkThread下标 private final int index; //WorkThread下标从0开始
private int threads; //WorkThread个数 private final int threads; //WorkThread个数
public WorkThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) { public WorkThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) {
super(target); super(target);
@@ -114,6 +114,16 @@ public class WorkThread extends Thread implements Executor {
return false; return false;
} }
/**
* 是否客户端的IO线程
*
* @since 2.8.0
* @return boolean
*/
public boolean inClient() {
return false;
}
public boolean inCurrThread() { public boolean inCurrThread() {
return this == Thread.currentThread(); return this == Thread.currentThread();
} }