diff --git a/src/org/redkale/net/NioTcpAsyncConnection.java b/src/org/redkale/net/NioTcpAsyncConnection.java index e7a9a9676..8030b18f6 100644 --- a/src/org/redkale/net/NioTcpAsyncConnection.java +++ b/src/org/redkale/net/NioTcpAsyncConnection.java @@ -75,7 +75,7 @@ public class NioTcpAsyncConnection extends AsyncConnection { public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, SocketChannel ch, SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { - super(ioThread.getBufferPool(), sslContext, livingCounter, closedCounter); + super(ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslContext, livingCounter, closedCounter); this.ioGroup = ioGroup; this.ioThread = ioThread; this.channel = ch; diff --git a/src/org/redkale/net/NioTcpPrepareRunner.java b/src/org/redkale/net/NioTcpPrepareRunner.java index 35e863414..ba98e4766 100644 --- a/src/org/redkale/net/NioTcpPrepareRunner.java +++ b/src/org/redkale/net/NioTcpPrepareRunner.java @@ -42,7 +42,7 @@ public class NioTcpPrepareRunner implements Runnable { channel.read(new CompletionHandler() { @Override public void completed(Integer count, ByteBuffer buffer) { - if (response == null) response = ((NioThread) Thread.currentThread()).getResponsePool().get(); + if (response == null) response = ((NioThread) Thread.currentThread()).getResponseSupplier().get(); if (count < 1) { buffer.clear(); channel.setReadBuffer(buffer); diff --git a/src/org/redkale/net/NioThread.java b/src/org/redkale/net/NioThread.java index b03acb7ee..dcf95343c 100644 --- a/src/org/redkale/net/NioThread.java +++ b/src/org/redkale/net/NioThread.java @@ -9,7 +9,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.function.Consumer; +import java.util.function.*; import org.redkale.util.*; /** @@ -26,21 +26,40 @@ public class NioThread extends WorkThread { final Selector selector; - private final ObjectPool bufferPool; + private final Supplier bufferSupplier; - private final ObjectPool responsePool; + private final Consumer bufferConsumer; + + private final Supplier responseSupplier; + + private final Consumer responseConsumer; private final ConcurrentLinkedQueue> registers = new ConcurrentLinkedQueue<>(); private boolean closed; public NioThread(String name, ExecutorService workExecutor, Selector selector, - ObjectPool bufferPool, ObjectPool responsePool) { + ObjectPool unsafeBufferPool, ObjectPool safeBufferPool, + ObjectPool unsafeResponsePool, ObjectPool safeResponsePool) { super(name, workExecutor, null); this.selector = selector; - this.bufferPool = bufferPool; - this.responsePool = responsePool; this.setDaemon(true); + this.bufferSupplier = () -> inCurrThread() ? unsafeBufferPool.get() : safeBufferPool.get(); + this.bufferConsumer = (v) -> { + if (inCurrThread()) { + unsafeBufferPool.accept(v); + } else { + safeBufferPool.accept(v); + } + }; + this.responseSupplier = () -> inCurrThread() ? unsafeResponsePool.get() : safeResponsePool.get(); + this.responseConsumer = (v) -> { + if (inCurrThread()) { + unsafeResponsePool.accept(v); + } else { + safeResponsePool.accept(v); + } + }; } public void register(Consumer consumer) { @@ -48,12 +67,20 @@ public class NioThread extends WorkThread { selector.wakeup(); } - public ObjectPool getBufferPool() { - return bufferPool; + public Supplier getBufferSupplier() { + return bufferSupplier; } - public ObjectPool getResponsePool() { - return responsePool; + public Consumer getBufferConsumer() { + return bufferConsumer; + } + + public Supplier getResponseSupplier() { + return responseSupplier; + } + + public Consumer getResponseConsumer() { + return responseConsumer; } @Override diff --git a/src/org/redkale/net/NioThreadGroup.java b/src/org/redkale/net/NioThreadGroup.java index 0a2af56ef..cfda032ea 100644 --- a/src/org/redkale/net/NioThreadGroup.java +++ b/src/org/redkale/net/NioThreadGroup.java @@ -32,18 +32,18 @@ public class NioThreadGroup { private ScheduledThreadPoolExecutor timeoutExecutor; public NioThreadGroup(final String serverName, ExecutorService workExecutor, int iothreads, - ObjectPool bufferPool, ObjectPool responsePool) throws IOException { + ObjectPool safeBufferPool, ObjectPool safeResponsePool) throws IOException { this.threads = new NioThread[Math.max(iothreads, 1)]; for (int i = 0; i < this.threads.length; i++) { - ObjectPool threadBufferPool = ObjectPool.createUnsafePool(bufferPool.getCreatCounter(), - bufferPool.getCycleCounter(), 8, - bufferPool.getCreator(), bufferPool.getPrepare(), bufferPool.getRecycler()); + ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool.getCreatCounter(), + safeBufferPool.getCycleCounter(), 8, + safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - ObjectPool threadResponsePool = ObjectPool.createUnsafePool(responsePool.getCreatCounter(), - responsePool.getCycleCounter(), 8, - responsePool.getCreator(), responsePool.getPrepare(), responsePool.getRecycler()); + ObjectPool unsafeResponsePool = ObjectPool.createUnsafePool(safeResponsePool.getCreatCounter(), + safeResponsePool.getCycleCounter(), 8, + safeResponsePool.getCreator(), safeResponsePool.getPrepare(), safeResponsePool.getRecycler()); String name = "Redkale-" + serverName + "-ServletThread" + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); - this.threads[i] = new NioThread(name, workExecutor, Selector.open(), threadBufferPool, threadResponsePool); + this.threads[i] = new NioThread(name, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool, unsafeResponsePool, safeResponsePool); } this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { Thread t = new Thread(r);