diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 73ce03b56..a824ac271 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -82,23 +82,16 @@ public class AsyncIOGroup extends AsyncGroup { try { for (int i = 0; i < threads; i++) { String indexfix = WorkThread.formatIndex(threads, i + 1); - ObjectPool unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), - safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); if (client) { - this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); - ObjectPool unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), - safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); + this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); + this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool); } else { - this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } } if (client) { - ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), - safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - this.connectThread = client ? new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) - : new AsyncIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + this.connectThread = new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); } else { this.connectThread = null; } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index ed95410ff..5b6c7cd78 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -5,6 +5,7 @@ */ package org.redkale.net; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; @@ -43,13 +44,13 @@ public class AsyncIOThread extends WorkThread { private boolean closed; - public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, - ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { + public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, null); - this.selector = selector; + this.selector = Selector.open(); this.setDaemon(true); - this.bufferSupplier = () -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).get(); - this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v); + ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(this, 512, safeBufferPool); + this.bufferSupplier = unsafeBufferPool; + this.bufferConsumer = unsafeBufferPool; } protected boolean isClosed() { diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 75cba385c..22737b3ae 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -92,8 +92,8 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { LongAdder createResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder(); - ObjectPool safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); - ObjectPool safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); + ObjectPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + ObjectPool safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); final int respPoolMax = server.getResponsePoolSize(); ThreadLocal> localResponsePool = ThreadLocal.withInitial(() -> { if (!(Thread.currentThread() instanceof WorkThread)) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 1a6a389f5..f24b292b8 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -89,8 +89,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { LongAdder createResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder(); - ObjectPool safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); - ObjectPool safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); + ObjectPool safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + ObjectPool safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ThreadLocal> localResponsePool = ThreadLocal.withInitial(() -> { if (!(Thread.currentThread() instanceof WorkThread)) { return null; @@ -112,9 +112,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { this.serverChannel.register(this.selector, SelectionKey.OP_READ); this.acceptThread = new Thread() { - ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), - safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - { setName(String.format(threadNameFormat, "Accept")); } @@ -127,6 +124,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { final int writes = ioWriteThreads.length; int readIndex = -1; int writeIndex = -1; + ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool); while (!closed) { final ByteBuffer buffer = unsafeBufferPool.get(); try { diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 5550b2d65..fbc387684 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -423,10 +423,10 @@ public abstract class Server createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); + protected abstract ObjectPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); //必须在 createContext()之后调用 - protected abstract ObjectPool

createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize); + protected abstract ObjectPool

createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize); public void shutdown() throws IOException { long s = System.currentTimeMillis(); diff --git a/src/main/java/org/redkale/net/client/ClientReadIOThread.java b/src/main/java/org/redkale/net/client/ClientReadIOThread.java index 05c1b6588..20624c9bf 100644 --- a/src/main/java/org/redkale/net/client/ClientReadIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientReadIOThread.java @@ -3,8 +3,8 @@ */ package org.redkale.net.client; +import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.Selector; import java.util.concurrent.ExecutorService; import org.redkale.net.AsyncIOThread; import org.redkale.util.ObjectPool; @@ -21,9 +21,9 @@ import org.redkale.util.ObjectPool; */ public class ClientReadIOThread extends AsyncIOThread { - public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, - ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { - super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, + ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + super(g, name, index, threads, workExecutor, safeBufferPool); } } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index e840ab9ba..b3ff2ec88 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -3,9 +3,9 @@ */ package org.redkale.net.client; -import java.io.Serializable; +import java.io.*; import java.nio.ByteBuffer; -import java.nio.channels.*; +import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,9 +26,9 @@ public class ClientWriteIOThread extends AsyncIOThread { private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); - public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, - ObjectPool unsafeBufferPool, ObjectPool safeBufferPool) { - super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); + public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, + ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + super(g, name, index, threads, workExecutor, safeBufferPool); } public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) { diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index cf9abce5e..f7dfd40fc 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -41,6 +41,8 @@ public class HttpServer extends Server safeBufferPool; + public HttpServer() { this(null, System.currentTimeMillis(), ResourceFactory.create()); } @@ -541,7 +543,7 @@ public class HttpServer extends Server createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { + protected ObjectPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { final int rcapacity = this.bufferCapacity; ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { @@ -551,11 +553,12 @@ public class HttpServer extends Server createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { + protected ObjectPool createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { Creator creator = (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), this.respConfig); ObjectPool pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, HttpResponse::prepare, HttpResponse::recycle); return pool; diff --git a/src/main/java/org/redkale/net/sncp/SncpServer.java b/src/main/java/org/redkale/net/sncp/SncpServer.java index 9e4aecaf0..a3530f8d0 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServer.java +++ b/src/main/java/org/redkale/net/sncp/SncpServer.java @@ -127,7 +127,7 @@ public class SncpServer extends Server createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { + protected ObjectPool createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { final int rcapacity = this.bufferCapacity; ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { @@ -141,7 +141,7 @@ public class SncpServer extends Server createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { + protected ObjectPool createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { Creator creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context)); ObjectPool pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle); return pool; diff --git a/src/main/java/org/redkale/util/ObjectPool.java b/src/main/java/org/redkale/util/ObjectPool.java index 03981c89b..5df6014c7 100644 --- a/src/main/java/org/redkale/util/ObjectPool.java +++ b/src/main/java/org/redkale/util/ObjectPool.java @@ -45,10 +45,14 @@ public class ObjectPool implements Supplier, Consumer { protected Thread unsafeThread; - protected ObjectPool(ObjectPool parent, LongAdder creatCounter, LongAdder cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler, Queue queue) { + //true表示unsafeThread不为空且当前为非线程安全版且parent为线程安全版 + protected final boolean safeCombine; + + protected ObjectPool(ObjectPool parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, Creator creator, Consumer prepare, Predicate recycler, Queue queue) { this.parent = parent; this.creatCounter = creatCounter; this.cycleCounter = cycleCounter; + this.unsafeThread = unsafeThread; this.creator = creator; this.prepare = prepare; this.recycler = recycler; @@ -56,6 +60,7 @@ public class ObjectPool implements Supplier, Consumer { this.debug = logger.isLoggable(Level.FINEST); this.queue = queue; this.unsafeDequeable = queue instanceof ArrayDeque; + this.safeCombine = unsafeThread != null && unsafeDequeable && parent != null && !parent.unsafeDequeable; } //非线程安全版 @@ -125,10 +130,21 @@ public class ObjectPool implements Supplier, Consumer { //非线程安全版 public static ObjectPool createUnsafePool(ObjectPool parent, LongAdder creatCounter, LongAdder cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { - return new ObjectPool(parent, creatCounter, cycleCounter, Math.max(Utility.cpus(), max), + return new ObjectPool(parent, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), creator, prepare, recycler, new ArrayDeque<>(Math.max(Utility.cpus(), max))); } + //非线程安全版 + public static ObjectPool createUnsafePool(ObjectPool parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, Creator creator, Consumer prepare, Predicate recycler) { + return new ObjectPool(parent, creatCounter, cycleCounter, unsafeThread, Math.max(Utility.cpus(), max), + creator, prepare, recycler, new ArrayDeque<>(Math.max(Utility.cpus(), max))); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(Thread unsafeThread, int max, ObjectPool safePool) { + return createUnsafePool(safePool, safePool.getCreatCounter(), safePool.getCycleCounter(), unsafeThread, max, safePool.getCreator(), safePool.getPrepare(), safePool.getRecycler()); + } + //线程安全版 public static ObjectPool createSafePool(Class clazz, Consumer prepare, Predicate recycler) { return createSafePool(2, clazz, prepare, recycler); @@ -161,7 +177,7 @@ public class ObjectPool implements Supplier, Consumer { //线程安全版 public static ObjectPool createSafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { - return new ObjectPool(null, creatCounter, cycleCounter, Math.max(Utility.cpus(), max), + return new ObjectPool(null, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max), creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Utility.cpus(), max))); } @@ -195,7 +211,11 @@ public class ObjectPool implements Supplier, Consumer { @Override public T get() { - if (unsafeDequeable) { + if (safeCombine) { + if (Thread.currentThread() != unsafeThread) { + return parent.get(); + } + } else if (unsafeDequeable) { if (unsafeThread == null) { unsafeThread = Thread.currentThread(); } else if (unsafeThread != Thread.currentThread()) { @@ -225,7 +245,12 @@ public class ObjectPool implements Supplier, Consumer { if (e == null) { return; } - if (unsafeDequeable) { + if (safeCombine) { + if (Thread.currentThread() != unsafeThread) { + parent.accept(e); + return; + } + } else if (unsafeDequeable) { if (unsafeThread == null) { unsafeThread = Thread.currentThread(); } else if (unsafeThread != Thread.currentThread()) {