diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 3aa34526d..9b7096adb 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -35,20 +35,18 @@ public class AsyncIOGroup extends AsyncGroup { private boolean skipClose; - //必须与ioWriteThreads数量相同 final AsyncIOThread[] ioReadThreads; - //必须与ioReadThreads数量相同 final AsyncIOThread[] ioWriteThreads; final AsyncIOThread connectThread; final int bufferCapacity; - final AtomicInteger shareCount = new AtomicInteger(1); - private final AtomicInteger readIndex = new AtomicInteger(); + private final AtomicInteger writeIndex = new AtomicInteger(); + //创建数 final LongAdder connCreateCounter = new LongAdder(); @@ -80,26 +78,27 @@ public class AsyncIOGroup extends AsyncGroup { final int threads = Utility.cpus(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; + final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); try { for (int i = 0; i < threads; i++) { String indexfix = WorkThread.formatIndex(threads, i + 1); ObjectPool unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); if (client) { - this.ioReadThreads[i] = new ClientReadIOThread(String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); ObjectPool unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - this.ioWriteThreads[i] = new ClientWriteIOThread(String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); + this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); } else { - this.ioReadThreads[i] = new AsyncIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } } if (client) { ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - this.connectThread = client ? new ClientReadIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) - : new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + this.connectThread = client ? new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) + : new AsyncIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); } else { this.connectThread = null; } @@ -148,18 +147,15 @@ public class AsyncIOGroup extends AsyncGroup { return this; } - public AsyncIOGroup dispose() { - if (shareCount.decrementAndGet() > 0) { - return this; - } + public synchronized AsyncIOGroup dispose() { if (closed) { return this; } - for (int i = 0; i < this.ioReadThreads.length; i++) { - this.ioReadThreads[i].close(); - if (this.ioWriteThreads[i] != this.ioReadThreads[i]) { - this.ioWriteThreads[i].close(); - } + for (AsyncIOThread t : this.ioReadThreads) { + t.close(); + } + for (AsyncIOThread t : this.ioWriteThreads) { + t.close(); } if (connectThread != null) { connectThread.close(); @@ -181,9 +177,14 @@ public class AsyncIOGroup extends AsyncGroup { return connClosedCounter; } - public AsyncIOThread[] nextIOThreads() { + public AsyncIOThread nextReadIOThread() { int i = Math.abs(readIndex.getAndIncrement()) % ioReadThreads.length; - return new AsyncIOThread[]{ioReadThreads[i], ioWriteThreads[i]}; + return ioReadThreads[i]; + } + + public AsyncIOThread nextWriteIOThread() { + int i = Math.abs(writeIndex.getAndIncrement()) % ioWriteThreads.length; + return ioWriteThreads[i]; } public AsyncIOThread connectThread() { @@ -235,20 +236,34 @@ public class AsyncIOGroup extends AsyncGroup { channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - AsyncIOThread[] ioThreads = null; - Thread currThread = Thread.currentThread(); - if (currThread instanceof AsyncIOThread) { - for (int i = 0; i < this.ioReadThreads.length; i++) { - if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { - ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; - break; + AsyncIOThread readThread = null; + AsyncIOThread writeThread = null; + AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread(); + if (currThread != null) { + if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) { + for (int i = 0; i < this.ioReadThreads.length; i++) { + if (this.ioReadThreads[i].index() == currThread.index()) { + readThread = this.ioReadThreads[i]; + break; + } + } + } + if (this.ioWriteThreads[0].getThreadGroup() == currThread.getThreadGroup()) { + for (int i = 0; i < this.ioWriteThreads.length; i++) { + if (this.ioWriteThreads[i].index() == currThread.index()) { + writeThread = this.ioWriteThreads[i]; + break; + } } } } - if (ioThreads == null) { - ioThreads = nextIOThreads(); + if (readThread == null) { + readThread = nextReadIOThread(); } - return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); + if (writeThread == null) { + writeThread = nextWriteIOThread(); + } + return new AsyncNioTcpConnection(true, this, readThread, writeThread, channel, null, null, address); } @Override @@ -304,20 +319,30 @@ public class AsyncIOGroup extends AsyncGroup { private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { DatagramChannel channel = DatagramChannel.open(); - AsyncIOThread[] ioThreads = null; - Thread currThread = Thread.currentThread(); - if (currThread instanceof AsyncIOThread) { + AsyncIOThread readThread = null; + AsyncIOThread writeThread = null; + AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread(); + if (currThread != null) { for (int i = 0; i < this.ioReadThreads.length; i++) { - if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { - ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; + if (this.ioReadThreads[i].index() == currThread.index()) { + readThread = this.ioReadThreads[i]; + break; + } + } + for (int i = 0; i < this.ioWriteThreads.length; i++) { + if (this.ioWriteThreads[i].index() == currThread.index()) { + writeThread = this.ioWriteThreads[i]; break; } } } - if (ioThreads == null) { - ioThreads = nextIOThreads(); + if (readThread == null) { + readThread = nextReadIOThread(); } - return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); + if (writeThread == null) { + writeThread = nextWriteIOThread(); + } + return new AsyncNioUdpConnection(true, this, readThread, writeThread, channel, null, null, address); } @Override diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index b8b7ad0c0..544708bfd 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -43,9 +43,9 @@ public class AsyncIOThread extends WorkThread { private boolean closed; - public AsyncIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, + public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { - super(name, index, threads, workExecutor, null); + super(g, name, index, threads, workExecutor, null); this.selector = selector; this.setDaemon(true); this.bufferSupplier = () -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).get(); @@ -201,13 +201,15 @@ public class AsyncIOThread extends WorkThread { } } - public void close() { - this.closed = true; - this.interrupt(); - try { - this.selector.close(); - } catch (Exception e) { - logger.log(Level.FINE, getName() + " selector close failed", e); + public synchronized void close() { + if (!this.closed) { + this.interrupt(); + try { + this.selector.close(); + } catch (Exception e) { + logger.log(Level.FINE, getName() + " selector close failed", e); + } + this.closed = true; } } } diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 5bb9122de..2fea85e8b 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -131,8 +131,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { public void run() { final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; - int threads = ioReadThreads.length; - int threadIndex = -1; + final int reads = ioReadThreads.length; + final int writes = ioWriteThreads.length; + int readIndex = -1; + int writeIndex = -1; Set keys = null; while (!closed) { try { @@ -145,10 +147,13 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { } for (SelectionKey key : keys) { if (key.isAcceptable()) { - if (++threadIndex >= threads) { - threadIndex = 0; + if (++readIndex >= reads) { + readIndex = 0; } - accept(key, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); + if (++writeIndex >= writes) { + writeIndex = 0; + } + accept(key, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); } } keys.clear(); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 64bab2aa8..c8469ba33 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -123,17 +123,22 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { public void run() { final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; - int threads = ioReadThreads.length; - int threadIndex = -1; + final int reads = ioReadThreads.length; + final int writes = ioWriteThreads.length; + int readIndex = -1; + int writeIndex = -1; while (!closed) { final ByteBuffer buffer = unsafeBufferPool.get(); try { SocketAddress address = serverChannel.receive(buffer); buffer.flip(); - if (++threadIndex >= threads) { - threadIndex = 0; + if (++readIndex >= reads) { + readIndex = 0; } - accept(address, buffer, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); + if (++writeIndex >= writes) { + writeIndex = 0; + } + accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]); } catch (Throwable t) { unsafeBufferPool.accept(buffer); } diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index ba2f33435..b08203022 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -28,8 +28,8 @@ public class WorkThread extends Thread implements Executor { private final int threads; //WorkThread个数 - public WorkThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) { - super(target); + public WorkThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Runnable target) { + super(g, target); if (name != null) { setName(name); } @@ -48,11 +48,12 @@ public class WorkThread extends Thread implements Executor { public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) { final AtomicReference ref = new AtomicReference<>(); final AtomicInteger counter = new AtomicInteger(); + final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); return new ThreadHashExecutor(threads, (Runnable r) -> { int i = counter.get(); int c = counter.incrementAndGet(); String threadName = String.format(threadNameFormat, formatIndex(threads, c)); - Thread t = new WorkThread(threadName, i, threads, ref.get(), r); + Thread t = new WorkThread(g, threadName, i, threads, ref.get(), r); return t; }); } @@ -60,11 +61,12 @@ public class WorkThread extends Thread implements Executor { public static ExecutorService createExecutor(final int threads, final String threadNameFormat) { final AtomicReference ref = new AtomicReference<>(); final AtomicInteger counter = new AtomicInteger(); + final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); return Executors.newFixedThreadPool(threads, (Runnable r) -> { int i = counter.get(); int c = counter.incrementAndGet(); String threadName = String.format(threadNameFormat, formatIndex(threads, c)); - Thread t = new WorkThread(threadName, i, threads, ref.get(), r); + Thread t = new WorkThread(g, threadName, i, threads, ref.get(), r); return t; }); } diff --git a/src/main/java/org/redkale/net/client/ClientReadIOThread.java b/src/main/java/org/redkale/net/client/ClientReadIOThread.java index e26977e5b..05c1b6588 100644 --- a/src/main/java/org/redkale/net/client/ClientReadIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientReadIOThread.java @@ -21,9 +21,9 @@ import org.redkale.util.ObjectPool; */ public class ClientReadIOThread extends AsyncIOThread { - public ClientReadIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, + public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { - super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); } } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index bf5d33fe9..e840ab9ba 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -26,9 +26,9 @@ public class ClientWriteIOThread extends AsyncIOThread { private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); - public ClientWriteIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, + public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { - super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); } public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) {