ObjectPool关闭public构造函数

This commit is contained in:
Redkale
2021-01-19 16:45:00 +08:00
parent 80c07a5c5b
commit fa7db42f50
13 changed files with 87 additions and 44 deletions

View File

@@ -317,7 +317,7 @@ public final class Application {
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 2);
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4);
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
transportPool = ObjectPool.createSafePool(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != capacity) return false;
e.clear();
@@ -440,7 +440,7 @@ public final class Application {
}
if (transportPool == null) {
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
transportPool = ObjectPool.createSafePool(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != capacity) return false;
e.clear();

View File

@@ -42,7 +42,7 @@ public class BsonReader extends Reader {
}
public static ObjectPool<BsonReader> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonReader(), null, (t) -> t.recycle());
return ObjectPool.createSafePool(max, (Object... params) -> new BsonReader(), null, (t) -> t.recycle());
}
public BsonReader(byte[] bytes) {

View File

@@ -29,7 +29,7 @@ public class BsonWriter extends Writer {
protected boolean tiny;
public static ObjectPool<BsonWriter> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), null, (t) -> t.recycle());
return ObjectPool.createSafePool(max, (Object... params) -> new BsonWriter(), null, (t) -> t.recycle());
}
public byte[] toArray() {

View File

@@ -14,7 +14,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncConnection;
import org.redkale.net.nio.NioCompletionHandler;
import org.redkale.net.nio.NioThread;
import org.redkale.net.nio.NioThreadGroup;

View File

@@ -152,7 +152,7 @@ public class TransportFactory {
}
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSeconds, int writeTimeoutSeconds) {
final ObjectPool<ByteBuffer> transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize,
final ObjectPool<ByteBuffer> transportPool = ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
e.clear();

View File

@@ -155,7 +155,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private final HttpRender onlyoneHttpRender;
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
return ObjectPool.createSafePool(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
}
public HttpResponse(HttpContext context, HttpRequest request, ObjectPool<Response> responsePool, HttpResponseConfig config) {

View File

@@ -458,7 +458,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
if (createCounter == null) createCounter = new AtomicLong();
if (cycleCounter == null) cycleCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createCounter, cycleCounter, bufferPoolSize,
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();

View File

@@ -30,7 +30,7 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
public static final int RETCODE_THROWEXCEPTION = (1 << 4); //内部异常
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle());
return ObjectPool.createSafePool(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle());
}
private final byte[] addrBytes;

View File

@@ -134,7 +134,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
if (createCounter == null) createCounter = new AtomicLong();
if (cycleCounter == null) cycleCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createCounter, cycleCounter, bufferPoolSize,
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();

View File

@@ -92,7 +92,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
return t;
});
final int bufferCapacity = Math.max(8 * 1024, Integer.decode(readprop.getProperty(JDBC_CONNECTIONSCAPACITY, "" + 8 * 1024)));
this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), Math.max(maxconns, this.threads * 2),
this.bufferPool = ObjectPool.createSafePool(new AtomicLong(), new AtomicLong(), Math.max(maxconns, this.threads * 2),
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
e.clear();

View File

@@ -5,7 +5,7 @@
package org.redkale.util;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
@@ -39,44 +39,87 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
protected final Queue<T> queue;
public ObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, prepare, recycler);
}
public ObjectPool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(max, Creator.create(clazz), prepare, recycler);
}
public ObjectPool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(2, creator, prepare, recycler);
}
public ObjectPool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ObjectPool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors() * 2, max),
creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)));
}
protected ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler, Queue<T> queue) {
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;
this.creator = creator;
this.prepare = prepare;
this.recycler = recycler;
this.queue = queue;
this.max = max;
this.debug = logger.isLoggable(Level.FINEST);
this.queue = queue;
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(2, clazz, prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(max, Creator.create(clazz), prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(2, creator, prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(null, null, max, creator, prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(null, null, max, creator, prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createUnsafePool(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return new ObjectPool(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors(), max),
creator, prepare, recycler, new ArrayDeque<>(Math.max(Runtime.getRuntime().availableProcessors(), max)));
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(2, clazz, prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(max, Creator.create(clazz), prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(2, creator, prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(null, null, max, creator, prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(null, null, max, creator, prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
//线程安全版
public static <T> ObjectPool<T> createSafePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return new ObjectPool(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors(), max),
creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors(), max)));
}
public void setCreator(Creator<T> creator) {
@@ -125,4 +168,5 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
public long getCycleCount() {
return cycleCounter.longValue();
}
}