From 76a85a6e3d224ac61cec2c6a6ef359c96f5e5291 Mon Sep 17 00:00:00 2001 From: Redkale Date: Fri, 30 Dec 2022 00:07:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 25 +++++++----- .../java/org/redkale/net/AsyncConnection.java | 8 ++-- .../java/org/redkale/net/AsyncIOGroup.java | 13 +++--- .../java/org/redkale/net/AsyncIOThread.java | 22 +++++++--- .../java/org/redkale/net/AsyncThread.java | 40 ------------------- .../java/org/redkale/net/ClientIOThread.java | 32 +++++++++++++++ src/main/java/org/redkale/net/WorkThread.java | 14 ++++++- 7 files changed, 85 insertions(+), 69 deletions(-) delete mode 100644 src/main/java/org/redkale/net/AsyncThread.java create mode 100644 src/main/java/org/redkale/net/ClientIOThread.java diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 8c789db16..b4ba744dc 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -587,35 +587,38 @@ public final class Application { executorConf = DefaultAnyValue.create(); } final AtomicReference workref = new AtomicReference<>(); - final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); - boolean executorHash = executorConf.getBoolValue("hash"); - if (executorThreads > 0) { + final int workThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); + boolean workHash = executorConf.getBoolValue("hash", false); + if (workThreads > 0) { final AtomicInteger workCounter = new AtomicInteger(); - if (executorHash) { - workExecutor0 = new ThreadHashExecutor(executorThreads, (Runnable r) -> { + if (workHash) { + workExecutor0 = new ThreadHashExecutor(workThreads, (Runnable r) -> { int i = workCounter.get(); int c = workCounter.incrementAndGet(); String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); + Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r); return t; }); } else { - workExecutor0 = Executors.newFixedThreadPool(executorThreads, (Runnable r) -> { + workExecutor0 = Executors.newFixedThreadPool(workThreads, (Runnable r) -> { int i = workCounter.get(); int c = workCounter.incrementAndGet(); String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); + Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r); return t; }); } workref.set(workExecutor0); } + + //给所有client给一个默认的AsyncGroup final AtomicInteger wclientCounter = new AtomicInteger(); - clientExecutor = Executors.newFixedThreadPool(Math.max(2, executorThreads / 2), (Runnable r) -> { + final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2); + clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> { int i = wclientCounter.get(); int c = wclientCounter.incrementAndGet(); - String threadname = "Redkale-ClientThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); + String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c)); + Thread t = new WorkThread(threadname, i, clientThreads, workref.get(), r); return t; }); } diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 97becb208..3b9a169fa 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -44,7 +44,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected final AsyncGroup ioGroup; - protected final AsyncThread ioThread; + protected final AsyncIOThread ioThread; protected final boolean client; @@ -76,12 +76,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; - protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncThread ioThread, final int bufferCapacity, ObjectPool bufferPool, + protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, ObjectPool bufferPool, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { this(client, ioGroup, ioThread, bufferCapacity, bufferPool, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter); } - protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncThread ioThread, final int bufferCapacity, Supplier bufferSupplier, + protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier bufferSupplier, Consumer bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { Objects.requireNonNull(bufferSupplier); Objects.requireNonNull(bufferConsumer); @@ -152,7 +152,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl return ioThread.inCurrThread(); } - public final AsyncThread getAsyncThread() { + public final AsyncIOThread getAsyncIOThread() { return ioThread; } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index e2ca8c1db..cd061bf7c 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -76,14 +76,15 @@ public class AsyncIOGroup extends AsyncGroup { ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); - - this.ioThreads[i] = new AsyncIOThread(true, name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + this.ioThreads[i] = client ? new ClientIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) + : new AsyncIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); } if (client) { ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "ConnectThread"); - this.connectThread = new AsyncIOThread(false, name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "IOConnectThread"); + this.connectThread = client ? new ClientIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) + : new AsyncIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); } } catch (IOException e) { throw new RuntimeException(e); @@ -196,9 +197,7 @@ public class AsyncIOGroup extends AsyncGroup { channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } catch (IOException e) { - CompletableFuture future = new CompletableFuture(); - future.completeExceptionally(e); - return future; + return CompletableFuture.failedFuture(e); } AsyncIOThread ioThread = null; Thread currThread = Thread.currentThread(); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 0082d879d..ba87b1d9f 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -24,7 +24,7 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public class AsyncIOThread extends AsyncThread { +public class AsyncIOThread extends WorkThread { protected static final Logger logger = Logger.getLogger(AsyncIOThread.class.getSimpleName()); @@ -42,9 +42,7 @@ public class AsyncIOThread extends AsyncThread { private boolean closed; - int invoker = 0; - - public AsyncIOThread(final boolean readable, String name, int index, int threads, ExecutorService workExecutor, Selector selector, + public AsyncIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { super(name, index, threads, workExecutor, null); this.selector = selector; @@ -53,6 +51,21 @@ public class AsyncIOThread extends AsyncThread { this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v); } + public static AsyncIOThread currAsyncIOThread() { + Thread t = Thread.currentThread(); + return t instanceof AsyncIOThread ? (AsyncIOThread) t : null; + } + + /** + * 是否IO线程 + * + * @return boolean + */ + @Override + public final boolean inIO() { + return true; + } + @Override public void execute(Runnable command) { commandQueue.offer(command); @@ -131,7 +144,6 @@ public class AsyncIOThread extends AsyncThread { while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); - invoker = 0; if (!key.isValid()) { continue; } diff --git a/src/main/java/org/redkale/net/AsyncThread.java b/src/main/java/org/redkale/net/AsyncThread.java deleted file mode 100644 index d3876519a..000000000 --- a/src/main/java/org/redkale/net/AsyncThread.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net; - -import java.util.concurrent.ExecutorService; - -/** - * 协议处理的IO线程类 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.5.0 - */ -public abstract class AsyncThread extends WorkThread { - - public AsyncThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) { - super(name, index, threads, workExecutor, target); - } - - public static AsyncThread currAsyncThread() { - Thread t = Thread.currentThread(); - return t instanceof AsyncThread ? (AsyncThread) t : null; - } - - /** - * 是否IO线程 - * - * @return boolean - */ - @Override - public final boolean inIO() { - return true; - } -} diff --git a/src/main/java/org/redkale/net/ClientIOThread.java b/src/main/java/org/redkale/net/ClientIOThread.java new file mode 100644 index 000000000..bf8f0f9c1 --- /dev/null +++ b/src/main/java/org/redkale/net/ClientIOThread.java @@ -0,0 +1,32 @@ +/* + * + */ +package org.redkale.net; + +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.util.concurrent.ExecutorService; +import org.redkale.util.ObjectPool; + +/** + * 客户端版的IO线程类 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +public class ClientIOThread extends AsyncIOThread { + + public ClientIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, + ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { + super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + } + + @Override + public final boolean inClient() { + return true; + } +} diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index cb835ac2e..853b0c19f 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -23,9 +23,9 @@ public class WorkThread extends Thread implements Executor { protected final ThreadHashExecutor hashExecutor; - private int index; //WorkThread下标 + private final int index; //WorkThread下标,从0开始 - private int threads; //WorkThread个数 + private final int threads; //WorkThread个数 public WorkThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) { super(target); @@ -114,6 +114,16 @@ public class WorkThread extends Thread implements Executor { return false; } + /** + * 是否客户端的IO线程 + * + * @since 2.8.0 + * @return boolean + */ + public boolean inClient() { + return false; + } + public boolean inCurrThread() { return this == Thread.currentThread(); }