去掉AsyncConnection内的ByteBufferPool,HttpResponse合并header和body的Buffer

This commit is contained in:
Redkale
2019-11-13 10:07:48 +08:00
parent 8a8d45e642
commit 51a95a84aa
9 changed files with 270 additions and 165 deletions

View File

@@ -14,7 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.util.ObjectPool;
import org.redkale.util.*;
/**
*
@@ -118,7 +118,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
@Override
public abstract int write(ByteBuffer src) throws IOException;
@@ -141,22 +140,40 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
this.readBuffer = null;
return rs;
}
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get();
// }
return bufferSupplier.get();
}
public void offerBuffer(Buffer buffer) {
if (buffer == null) return;
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer);
// return;
// }
bufferConsumer.accept((ByteBuffer) buffer);
}
public void offerBuffer(Buffer... buffers) {
if (buffers == null) return;
Consumer<ByteBuffer> consumer = this.bufferConsumer;
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// consumer = ((IOThread) thread).getBufferPool();
// }
for (Buffer buffer : buffers) {
bufferConsumer.accept((ByteBuffer) buffer);
consumer.accept((ByteBuffer) buffer);
}
}
public ByteBuffer pollWriteBuffer() {
// Thread thread = Thread.currentThread();
// if (thread instanceof IOThread) {
// return ((IOThread) thread).getBufferPool().get();
// }
return bufferSupplier.get();
}
@@ -189,7 +206,12 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
}
}
if (this.readBuffer != null) {
bufferConsumer.accept(this.readBuffer);
Consumer<ByteBuffer> consumer = this.bufferConsumer;
Thread thread = Thread.currentThread();
if (thread instanceof IOThread) {
consumer = ((IOThread) thread).getBufferPool();
}
consumer.accept(this.readBuffer);
}
if (attributes == null) return;
try {

View File

@@ -0,0 +1,61 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import org.redkale.util.*;
/**
* 协议处理的IO线程类
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class IOThread extends Thread {
protected Thread localThread;
protected final ExecutorService executor;
protected ObjectPool<ByteBuffer> bufferPool;
public IOThread(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, Runnable runner) {
super(runner);
this.executor = executor;
this.bufferPool = bufferPool;
this.setDaemon(true);
}
public void runAsync(Runnable runner) {
executor.execute(runner);
}
public ExecutorService getExecutor() {
return executor;
}
public ObjectPool<ByteBuffer> getBufferPool() {
return bufferPool;
}
@Override
public void run() {
this.localThread = Thread.currentThread();
super.run();
}
public boolean inSameThread() {
return this.localThread == Thread.currentThread();
}
public boolean inSameThread(Thread thread) {
return this.localThread == thread;
}
}

View File

@@ -60,7 +60,8 @@ public class PrepareRunner implements Runnable {
@Override
public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) {
response.request.offerReadBuffer(buffer);
buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
@@ -84,7 +85,8 @@ public class PrepareRunner implements Runnable {
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer);
buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
@@ -97,7 +99,7 @@ public class PrepareRunner implements Runnable {
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
if (te != null && context.logger.isLoggable(Level.FINEST)) {
if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
}
}
@@ -116,7 +118,8 @@ public class PrepareRunner implements Runnable {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
response.request.offerReadBuffer(buffer);
buffer.clear();
channel.setReadBuffer(buffer);
}
preparer.prepare(request, response);
} else {
@@ -137,7 +140,8 @@ public class PrepareRunner implements Runnable {
if (attachment.hasRemaining()) {
request.setMoredata(attachment);
} else {
response.request.offerReadBuffer(attachment);
attachment.clear();
channel.setReadBuffer(attachment);
}
try {
preparer.prepare(request, response);
@@ -151,7 +155,8 @@ public class PrepareRunner implements Runnable {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
preparer.illRequestCounter.incrementAndGet();
response.request.offerReadBuffer(attachment);
attachment.clear();
channel.setReadBuffer(attachment);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
}
@@ -175,19 +180,4 @@ public class PrepareRunner implements Runnable {
return response.removeChannel();
}
protected ByteBuffer pollReadBuffer(Request request) {
return request.pollReadBuffer();
}
protected ByteBuffer pollReadBuffer(Response response) {
return response.request.pollReadBuffer();
}
protected void offerReadBuffer(Request request, ByteBuffer buffer) {
request.offerReadBuffer(buffer);
}
protected void offerReadBuffer(Response response, ByteBuffer buffer) {
response.request.offerReadBuffer(buffer);
}
}

View File

@@ -40,8 +40,6 @@ public abstract class Request<C extends Context> {
protected AsyncConnection channel;
protected ByteBuffer readBuffer;
/**
* properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留;
* properties 通常存放需要永久绑定在request里的一些对象
@@ -67,23 +65,6 @@ public abstract class Request<C extends Context> {
return rs;
}
protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected void offerReadBuffer(ByteBuffer buffer) {
if (buffer == null) return;
if (this.readBuffer == null) {
buffer.clear();
this.readBuffer = buffer;
} else {
bufferPool.accept(buffer);
}
}
/**
* 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。
*

View File

@@ -27,18 +27,12 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool;
protected final R request;
protected AsyncConnection channel;
protected ByteBuffer writeHeadBuffer;
protected ByteBuffer writeBodyBuffer;
private volatile boolean inited = true;
protected Object output; //输出的结果对象
@@ -49,8 +43,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Servlet<C, R, ? extends Response<C, R>> servlet;
private Supplier<ByteBuffer> bodyBufferSupplier;
private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
@@ -58,31 +50,21 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
} else {
offerResponseBuffer(attachment);
channel.offerBuffer(attachment);
ByteBuffer data = request.removeMoredata();
final boolean more = data != null && request.keepAlive;
request.more = more;
finish();
if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
offerResponseBuffer(attachment);
channel.offerBuffer(attachment);
finish(true);
}
private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment;
}
} else if (writeBodyBuffer == null) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment;
}
} else {
bufferPool.accept(attachment);
}
}
};
private final CompletionHandler finishHandler2 = new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -99,73 +81,36 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (index >= 0) {
channel.write(attachments, index, attachments.length - index, attachments, this);
} else {
offerResponseBuffer(attachments);
for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
}
ByteBuffer data = request.removeMoredata();
final boolean more = data != null && request.keepAlive;
request.more = more;
finish();
if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run();
}
}
@Override
public void failed(Throwable exc, final ByteBuffer[] attachments) {
offerResponseBuffer(attachments);
for (ByteBuffer attachment : attachments) {
channel.offerBuffer(attachment);
}
finish(true);
}
private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0;
if (writeHeadBuffer == null && attachments.length > start) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start];
start++;
}
}
if (writeBodyBuffer == null && attachments.length > start) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start];
start++;
}
}
for (int i = start; i < attachments.length; i++) {
bufferPool.accept(attachments[i]);
}
}
};
protected Response(C context, final R request, ObjectPool<Response> responsePool) {
this.context = context;
this.request = request;
this.bufferPool = request.bufferPool;
this.responsePool = responsePool;
this.writeHeadBuffer = bufferPool.get();
this.writeBodyBuffer = bufferPool.get();
this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return bufferPool.get();
writeBodyBuffer = null;
return buffer;
};
}
protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null;
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected Supplier<ByteBuffer> getBodyBufferSupplier() {
return bodyBufferSupplier;
}
protected void offerBuffer(ByteBuffer... buffers) {
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
channel.offerBuffer(buffer);
}
}
@@ -278,7 +223,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = this.bufferPool.get();
ByteBuffer buffer = channel.bufferSupplier.get();
buffer.put(bs);
buffer.flip();
this.finish(buffer);
@@ -289,33 +234,33 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭
ByteBuffer data = this.request.removeMoredata();
final AsyncConnection conn = this.channel;
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
// ByteBuffer data = this.request.removeMoredata();
// final boolean more = data != null && this.request.keepAlive;
// this.request.more = more;
conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(boolean kill, ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭
if (kill) refuseAlive();
ByteBuffer data = this.request.removeMoredata();
final AsyncConnection conn = this.channel;
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
// ByteBuffer data = this.request.removeMoredata();
// final boolean more = data != null && this.request.keepAlive;
// this.request.more = more;
conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(ByteBuffer... buffers) {
if (!this.inited) return; //避免重复关闭
final AsyncConnection conn = this.channel;
ByteBuffer data = this.request.removeMoredata();
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
// ByteBuffer data = this.request.removeMoredata();
// final boolean more = data != null && this.request.keepAlive;
// this.request.more = more;
conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(boolean kill, ByteBuffer... buffers) {
@@ -337,14 +282,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this);
} else {
bufferPool.accept(buffer);
channel.offerBuffer(buffer);
if (handler != null) handler.completed(result, attachment);
}
}
@Override
public void failed(Throwable exc, A attachment) {
bufferPool.accept(buffer);
channel.offerBuffer(buffer);
if (handler != null) handler.failed(exc, attachment);
}
@@ -362,7 +307,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
index = i;
break;
}
bufferPool.accept(buffers[i]);
channel.offerBuffer(buffers[i]);
}
if (index == 0) {
channel.write(buffers, attachment, this);
@@ -376,7 +321,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
channel.offerBuffer(buffer);
}
if (handler != null) handler.failed(exc, attachment);
}

View File

@@ -105,6 +105,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private static final ZoneId ZONE_GMT = ZoneId.of("GMT");
private static final byte[] LENG_BYTES = ("Content-Length: \r\n").getBytes();
private int status = 200;
private String contentType = "";
@@ -113,9 +115,15 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private HttpCookie[] cookies;
private boolean headsended = false;
private int headWritedSize = -1; //0表示跳过header正数表示header的字节长度。
private ByteBuffer headBuffer;
private int headLenPos = -1;
private BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler;
private Supplier<ByteBuffer> bodyBufferSupplier;
//------------------------------------------------
private final String plainContentType;
@@ -163,6 +171,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.hasRender = renders != null && !renders.isEmpty();
this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null;
this.contentType = this.plainContentType;
this.bodyBufferSupplier = () -> {
if (headWritedSize >= 0 || bufferHandler != null) return channel.pollWriteBuffer(); //bufferHandler 需要cached的请求不能带上header
if (contentLength < 0) contentLength = -2;
return createHeader();
};
}
@Override
@@ -185,12 +198,18 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.contentLength = -1;
this.contentType = null;
this.cookies = null;
this.headsended = false;
this.headWritedSize = -1;
this.headBuffer = null;
this.headLenPos = -1;
this.header.clear();
this.bufferHandler = null;
return super.recycle();
}
protected Supplier<ByteBuffer> getBodyBufferSupplier() {
return bodyBufferSupplier;
}
@Override
protected void init(AsyncConnection channel) {
super.init(channel);
@@ -285,15 +304,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler());
}
/**
* 获取ByteBuffer生成器
*
* @return ByteBuffer生成器
*/
public Supplier<ByteBuffer> getBufferSupplier() {
return getBodyBufferSupplier();
}
/**
* 将对象以JSON格式输出
*
@@ -637,7 +647,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finish(final String contentType, final byte[] bs) {
if (isClosed()) return; //避免重复关闭
final byte[] content = bs == null ? new byte[0] : bs;
if (!this.headsended) {
if (this.headWritedSize < 0) {
this.contentType = contentType;
this.contentLength = content.length;
ByteBuffer headbuf = createHeader();
@@ -681,7 +691,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override
public void finish(boolean kill, ByteBuffer buffer) {
if (isClosed()) return; //避免重复关闭
if (!this.headsended) {
if (this.headWritedSize < 0) {
this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader();
headbuf.flip();
@@ -719,7 +729,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (bufs != null) buffers = bufs;
}
if (kill) refuseAlive();
if (!this.headsended) {
if (this.headWritedSize < 0) {
long len = 0;
for (ByteBuffer buf : buffers) {
len += buf.remaining();
@@ -736,6 +746,17 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
super.finish(kill, newbuffers);
}
} else {
if (this.headLenPos > 0 && buffers[0] == headBuffer) {
long contentlen = -this.headWritedSize;
for (ByteBuffer buf : buffers) {
contentlen += buf.remaining();
}
byte[] lenBytes = String.valueOf(contentlen).getBytes();
int start = this.headLenPos - lenBytes.length;
for (int i = 0; i < lenBytes.length; i++) {
headBuffer.put(start + i, lenBytes[i]);
}
}
super.finish(kill, buffers);
}
}
@@ -749,7 +770,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param handler 异步回调函数
*/
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) {
if (this.headWritedSize < 0) {
if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader();
headbuf.flip();
@@ -772,7 +793,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param handler 异步回调函数
*/
public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) {
if (this.headWritedSize < 0) {
if (this.contentLength < 0) {
int len = 0;
if (buffers != null && buffers.length > 0) {
@@ -899,14 +920,19 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
//Header大小不能超过一个ByteBuffer的容量
protected ByteBuffer createHeader() {
this.headsended = true;
ByteBuffer buffer = this.pollWriteReadBuffer();
ByteBuffer buffer = this.channel.pollWriteBuffer();
int oldpos = buffer.position();
if (this.status == 200) {
buffer.put(status200Bytes);
} else {
buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes());
}
if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
if (this.contentLength >= 0) {
buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
} else if (this.contentLength == -2) {
buffer.put(LENG_BYTES);
this.headLenPos = buffer.position() - 2; //去掉\r\n
}
if (!this.request.isWebSocket()) {
if (this.contentType == this.jsonContentType) {
buffer.put(this.jsonContentTypeBytes);
@@ -978,6 +1004,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
}
buffer.put(LINE);
this.headWritedSize = buffer.position() - oldpos;
this.headBuffer = buffer;
return buffer;
}
@@ -1003,7 +1031,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @return HttpResponse
*/
public HttpResponse skipHeader() {
this.headsended = true;
this.headWritedSize = 0;
return this;
}
@@ -1210,7 +1238,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
bufferPool.accept(attachment);
channel.offerBuffer(attachment);
finish(true);
try {
filechannel.close();

View File

@@ -59,7 +59,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteBuffer buffer = pollWriteReadBuffer();
final ByteBuffer buffer = channel.pollWriteBuffer();
fillHeader(buffer, 0, retcode);
finish(buffer);
return;

View File

@@ -19,23 +19,25 @@ import java.util.logging.*;
* @author zhangjx
* @param <T> 对象池元素的数据类型
*/
public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
protected static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
private final boolean debug;
protected final boolean debug;
private final Queue<T> queue;
protected Creator<T> creator;
private Creator<T> creator;
protected int max;
private final Consumer<T> prepare;
protected final Consumer<T> prepare;
private final Predicate<T> recycler;
protected final Predicate<T> recycler;
private final AtomicLong creatCounter;
protected final AtomicLong creatCounter;
private final AtomicLong cycleCounter;
protected final AtomicLong cycleCounter;
protected final Queue<T> queue;
public ObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, prepare, recycler);
@@ -62,12 +64,18 @@ public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors() * 2, max),
creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)));
}
protected ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler, Queue<T> queue) {
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;
this.creator = creator;
this.prepare = prepare;
this.recycler = recycler;
this.queue = new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max));
this.queue = queue;
this.max = max;
this.debug = logger.isLoggable(Level.FINEST);
}

View File

@@ -0,0 +1,70 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.util;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
/**
* 对象池
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <T> 对象池元素的数据类型
*/
public class ThreadLocalObjectPool<T> extends ObjectPool<T> {
public ThreadLocalObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(max, Creator.create(clazz), prepare, recycler);
}
public ThreadLocalObjectPool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(2, creator, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ThreadLocalObjectPool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
super(creatCounter, cycleCounter, max, creator, prepare, recycler, new LinkedList<>());
}
@Override
public T get() {
T result = queue.poll();
if (result == null) {
if (creatCounter != null) creatCounter.incrementAndGet();
result = this.creator.create();
}
if (prepare != null) prepare.accept(result);
return result;
}
@Override
public void accept(final T e) {
if (e != null && recycler.test(e) && this.queue.size() < this.max) {
if (cycleCounter != null) cycleCounter.incrementAndGet();
queue.offer(e);
}
}
}