From fa7db42f50cd24dee1c8dfc2aa76f4d4829d8219 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 19 Jan 2021 16:45:00 +0800 Subject: [PATCH] =?UTF-8?q?ObjectPool=E5=85=B3=E9=97=ADpublic=E6=9E=84?= =?UTF-8?q?=E9=80=A0=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/Application.java | 4 +- src/org/redkale/convert/bson/BsonReader.java | 2 +- src/org/redkale/convert/bson/BsonWriter.java | 2 +- .../redkale/net/TcpNioAsyncConnection.java | 1 - src/org/redkale/net/TransportFactory.java | 2 +- src/org/redkale/net/http/HttpResponse.java | 2 +- src/org/redkale/net/http/HttpServer.java | 2 +- src/org/redkale/net/sncp/SncpResponse.java | 2 +- src/org/redkale/net/sncp/SncpServer.java | 2 +- src/org/redkale/source/DataSqlSource.java | 2 +- src/org/redkale/util/ObjectPool.java | 106 +++++++++++++----- .../redkale/test/service/ABMainService.java | 2 +- test/org/redkale/test/sncp/SncpTest.java | 2 +- 13 files changed, 87 insertions(+), 44 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index b62373281..ed87a89e1 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -317,7 +317,7 @@ public final class Application { final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 2); bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4); final int capacity = bufferCapacity; - transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + transportPool = ObjectPool.createSafePool(createBufferCounter, cycleBufferCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != capacity) return false; e.clear(); @@ -440,7 +440,7 @@ public final class Application { } if (transportPool == null) { final int capacity = bufferCapacity; - transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + transportPool = ObjectPool.createSafePool(createBufferCounter, cycleBufferCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != capacity) return false; e.clear(); diff --git a/src/org/redkale/convert/bson/BsonReader.java b/src/org/redkale/convert/bson/BsonReader.java index 651e0cfdc..ceeb9b640 100644 --- a/src/org/redkale/convert/bson/BsonReader.java +++ b/src/org/redkale/convert/bson/BsonReader.java @@ -42,7 +42,7 @@ public class BsonReader extends Reader { } public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new BsonReader(), null, (t) -> t.recycle()); + return ObjectPool.createSafePool(max, (Object... params) -> new BsonReader(), null, (t) -> t.recycle()); } public BsonReader(byte[] bytes) { diff --git a/src/org/redkale/convert/bson/BsonWriter.java b/src/org/redkale/convert/bson/BsonWriter.java index c55e85f3c..63ba622f1 100644 --- a/src/org/redkale/convert/bson/BsonWriter.java +++ b/src/org/redkale/convert/bson/BsonWriter.java @@ -29,7 +29,7 @@ public class BsonWriter extends Writer { protected boolean tiny; public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), null, (t) -> t.recycle()); + return ObjectPool.createSafePool(max, (Object... params) -> new BsonWriter(), null, (t) -> t.recycle()); } public byte[] toArray() { diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java index bf80be003..f58e449fb 100644 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -14,7 +14,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import javax.net.ssl.SSLContext; -import org.redkale.net.AsyncConnection; import org.redkale.net.nio.NioCompletionHandler; import org.redkale.net.nio.NioThread; import org.redkale.net.nio.NioThreadGroup; diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 10888ca33..b8073801e 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -152,7 +152,7 @@ public class TransportFactory { } public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSeconds, int writeTimeoutSeconds) { - final ObjectPool transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize, + final ObjectPool transportPool = ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; e.clear(); diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index c12c6f704..910b6aef2 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -155,7 +155,7 @@ public class HttpResponse extends Response { private final HttpRender onlyoneHttpRender; public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { - return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); + return ObjectPool.createSafePool(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); } public HttpResponse(HttpContext context, HttpRequest request, ObjectPool responsePool, HttpResponseConfig config) { diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index 043a64998..8ebd0ac2e 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -458,7 +458,7 @@ public class HttpServer extends Server bufferPool = new ObjectPool<>(createCounter, cycleCounter, bufferPoolSize, + ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; e.clear(); diff --git a/src/org/redkale/net/sncp/SncpResponse.java b/src/org/redkale/net/sncp/SncpResponse.java index f02b5409a..326207ad9 100644 --- a/src/org/redkale/net/sncp/SncpResponse.java +++ b/src/org/redkale/net/sncp/SncpResponse.java @@ -30,7 +30,7 @@ public class SncpResponse extends Response { public static final int RETCODE_THROWEXCEPTION = (1 << 4); //内部异常 public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { - return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle()); + return ObjectPool.createSafePool(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle()); } private final byte[] addrBytes; diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index 4afe29e25..03029b4e9 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -134,7 +134,7 @@ public class SncpServer extends Server bufferPool = new ObjectPool<>(createCounter, cycleCounter, bufferPoolSize, + ObjectPool bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; e.clear(); diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 427e7bb66..786644b5c 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -92,7 +92,7 @@ public abstract class DataSqlSource extends AbstractService implement return t; }); final int bufferCapacity = Math.max(8 * 1024, Integer.decode(readprop.getProperty(JDBC_CONNECTIONSCAPACITY, "" + 8 * 1024))); - this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), Math.max(maxconns, this.threads * 2), + this.bufferPool = ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), Math.max(maxconns, this.threads * 2), (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; e.clear(); diff --git a/src/org/redkale/util/ObjectPool.java b/src/org/redkale/util/ObjectPool.java index 7219f0bcb..f32c03ffb 100644 --- a/src/org/redkale/util/ObjectPool.java +++ b/src/org/redkale/util/ObjectPool.java @@ -5,7 +5,7 @@ package org.redkale.util; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.logging.*; @@ -39,44 +39,87 @@ public class ObjectPool implements Supplier, Consumer { protected final Queue queue; - public ObjectPool(Class clazz, Consumer prepare, Predicate recycler) { - this(2, clazz, prepare, recycler); - } - - public ObjectPool(int max, Class clazz, Consumer prepare, Predicate recycler) { - this(max, Creator.create(clazz), prepare, recycler); - } - - public ObjectPool(Creator creator, Consumer prepare, Predicate recycler) { - this(2, creator, prepare, recycler); - } - - public ObjectPool(int max, Creator creator, Consumer prepare, Predicate recycler) { - this(null, null, max, creator, prepare, recycler); - } - - public ObjectPool(int max, Supplier creator, Consumer prepare, Predicate recycler) { - this(null, null, max, creator, prepare, recycler); - } - - public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier creator, Consumer prepare, Predicate recycler) { - this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler); - } - - public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { - this(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors() * 2, max), - creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max))); - } - protected ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler, Queue queue) { this.creatCounter = creatCounter; this.cycleCounter = cycleCounter; this.creator = creator; this.prepare = prepare; this.recycler = recycler; - this.queue = queue; this.max = max; this.debug = logger.isLoggable(Level.FINEST); + this.queue = queue; + } + + //非线程安全版 + public static ObjectPool createUnsafePool(Class clazz, Consumer prepare, Predicate recycler) { + return createUnsafePool(2, clazz, prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(int max, Class clazz, Consumer prepare, Predicate recycler) { + return createUnsafePool(max, Creator.create(clazz), prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(Creator creator, Consumer prepare, Predicate recycler) { + return createUnsafePool(2, creator, prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(int max, Creator creator, Consumer prepare, Predicate recycler) { + return createUnsafePool(null, null, max, creator, prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(int max, Supplier creator, Consumer prepare, Predicate recycler) { + return createUnsafePool(null, null, max, creator, prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier creator, Consumer prepare, Predicate recycler) { + return createUnsafePool(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler); + } + + //非线程安全版 + public static ObjectPool createUnsafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { + return new ObjectPool(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors(), max), + creator, prepare, recycler, new ArrayDeque<>(Math.max(Runtime.getRuntime().availableProcessors(), max))); + } + + //线程安全版 + public static ObjectPool createSafePool(Class clazz, Consumer prepare, Predicate recycler) { + return createSafePool(2, clazz, prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(int max, Class clazz, Consumer prepare, Predicate recycler) { + return createSafePool(max, Creator.create(clazz), prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(Creator creator, Consumer prepare, Predicate recycler) { + return createSafePool(2, creator, prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(int max, Creator creator, Consumer prepare, Predicate recycler) { + return createSafePool(null, null, max, creator, prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(int max, Supplier creator, Consumer prepare, Predicate recycler) { + return createSafePool(null, null, max, creator, prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier creator, Consumer prepare, Predicate recycler) { + return createSafePool(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler); + } + + //线程安全版 + public static ObjectPool createSafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { + return new ObjectPool(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors(), max), + creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors(), max))); } public void setCreator(Creator creator) { @@ -125,4 +168,5 @@ public class ObjectPool implements Supplier, Consumer { public long getCycleCount() { return cycleCounter.longValue(); } + } diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index b50ce40d1..17fe028cd 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -149,7 +149,7 @@ public class ABMainService implements Service { } public static ObjectPool newBufferPool() { - return new ObjectPool<>(new AtomicLong(), new AtomicLong(), 16, + return ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), 16, (Object... params) -> ByteBuffer.allocateDirect(8192), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != 8192) return false; e.clear(); diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index a14498364..09109e2c6 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -66,7 +66,7 @@ public class SncpTest { } public static ObjectPool newBufferPool() { - return new ObjectPool<>(new AtomicLong(), new AtomicLong(), 16, + return ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), 16, (Object... params) -> ByteBuffer.allocateDirect(8192), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != 8192) return false; e.clear();