diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 30a331725..8192cab9e 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; import javax.net.ssl.SSLContext; -import org.redkale.util.ObjectPool; +import org.redkale.util.*; /** * @@ -118,7 +118,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy public abstract void read(CompletionHandler handler); - @Override public abstract int write(ByteBuffer src) throws IOException; @@ -141,22 +140,40 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy this.readBuffer = null; return rs; } +// Thread thread = Thread.currentThread(); +// if (thread instanceof IOThread) { +// return ((IOThread) thread).getBufferPool().get(); +// } return bufferSupplier.get(); } public void offerBuffer(Buffer buffer) { if (buffer == null) return; +// Thread thread = Thread.currentThread(); +// if (thread instanceof IOThread) { +// ((IOThread) thread).getBufferPool().accept((ByteBuffer) buffer); +// return; +// } bufferConsumer.accept((ByteBuffer) buffer); } public void offerBuffer(Buffer... buffers) { if (buffers == null) return; + Consumer consumer = this.bufferConsumer; +// Thread thread = Thread.currentThread(); +// if (thread instanceof IOThread) { +// consumer = ((IOThread) thread).getBufferPool(); +// } for (Buffer buffer : buffers) { - bufferConsumer.accept((ByteBuffer) buffer); + consumer.accept((ByteBuffer) buffer); } } public ByteBuffer pollWriteBuffer() { +// Thread thread = Thread.currentThread(); +// if (thread instanceof IOThread) { +// return ((IOThread) thread).getBufferPool().get(); +// } return bufferSupplier.get(); } @@ -189,7 +206,12 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy } } if (this.readBuffer != null) { - bufferConsumer.accept(this.readBuffer); + Consumer consumer = this.bufferConsumer; + Thread thread = Thread.currentThread(); + if (thread instanceof IOThread) { + consumer = ((IOThread) thread).getBufferPool(); + } + consumer.accept(this.readBuffer); } if (attributes == null) return; try { diff --git a/src/org/redkale/net/IOThread.java b/src/org/redkale/net/IOThread.java new file mode 100644 index 000000000..468d6617a --- /dev/null +++ b/src/org/redkale/net/IOThread.java @@ -0,0 +1,61 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import org.redkale.util.*; + +/** + * 协议处理的IO线程类 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class IOThread extends Thread { + + protected Thread localThread; + + protected final ExecutorService executor; + + protected ObjectPool bufferPool; + + public IOThread(ExecutorService executor, ObjectPool bufferPool, Runnable runner) { + super(runner); + this.executor = executor; + this.bufferPool = bufferPool; + this.setDaemon(true); + } + + public void runAsync(Runnable runner) { + executor.execute(runner); + } + + public ExecutorService getExecutor() { + return executor; + } + + public ObjectPool getBufferPool() { + return bufferPool; + } + + @Override + public void run() { + this.localThread = Thread.currentThread(); + super.run(); + } + + public boolean inSameThread() { + return this.localThread == Thread.currentThread(); + } + + public boolean inSameThread(Thread thread) { + return this.localThread == thread; + } + +} diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index 5e3325474..0a51ed35b 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -60,7 +60,8 @@ public class PrepareRunner implements Runnable { @Override public void completed(Integer count, ByteBuffer buffer) { if (count < 1) { - response.request.offerReadBuffer(buffer); + buffer.clear(); + channel.setReadBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); @@ -84,7 +85,8 @@ public class PrepareRunner implements Runnable { @Override public void failed(Throwable exc, ByteBuffer buffer) { - response.request.offerReadBuffer(buffer); + buffer.clear(); + channel.setReadBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); @@ -97,7 +99,7 @@ public class PrepareRunner implements Runnable { channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); - if (te != null && context.logger.isLoggable(Level.FINEST)) { + if (context.logger.isLoggable(Level.FINEST)) { context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); } } @@ -116,7 +118,8 @@ public class PrepareRunner implements Runnable { if (buffer.hasRemaining()) { request.setMoredata(buffer); } else { - response.request.offerReadBuffer(buffer); + buffer.clear(); + channel.setReadBuffer(buffer); } preparer.prepare(request, response); } else { @@ -137,7 +140,8 @@ public class PrepareRunner implements Runnable { if (attachment.hasRemaining()) { request.setMoredata(attachment); } else { - response.request.offerReadBuffer(attachment); + attachment.clear(); + channel.setReadBuffer(attachment); } try { preparer.prepare(request, response); @@ -151,7 +155,8 @@ public class PrepareRunner implements Runnable { @Override public void failed(Throwable exc, ByteBuffer attachment) { preparer.illRequestCounter.incrementAndGet(); - response.request.offerReadBuffer(attachment); + attachment.clear(); + channel.setReadBuffer(attachment); response.finish(true); if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); } @@ -175,19 +180,4 @@ public class PrepareRunner implements Runnable { return response.removeChannel(); } - protected ByteBuffer pollReadBuffer(Request request) { - return request.pollReadBuffer(); - } - - protected ByteBuffer pollReadBuffer(Response response) { - return response.request.pollReadBuffer(); - } - - protected void offerReadBuffer(Request request, ByteBuffer buffer) { - request.offerReadBuffer(buffer); - } - - protected void offerReadBuffer(Response response, ByteBuffer buffer) { - response.request.offerReadBuffer(buffer); - } } diff --git a/src/org/redkale/net/Request.java b/src/org/redkale/net/Request.java index 7767a5622..c8aa149e2 100644 --- a/src/org/redkale/net/Request.java +++ b/src/org/redkale/net/Request.java @@ -40,8 +40,6 @@ public abstract class Request { protected AsyncConnection channel; - protected ByteBuffer readBuffer; - /** * properties 与 attributes 的区别在于:调用recycle时, attributes会被清空而properties会保留; * properties 通常存放需要永久绑定在request里的一些对象 @@ -67,23 +65,6 @@ public abstract class Request { return rs; } - protected ByteBuffer pollReadBuffer() { - ByteBuffer buffer = this.readBuffer; - this.readBuffer = null; - if (buffer == null) buffer = bufferPool.get(); - return buffer; - } - - protected void offerReadBuffer(ByteBuffer buffer) { - if (buffer == null) return; - if (this.readBuffer == null) { - buffer.clear(); - this.readBuffer = buffer; - } else { - bufferPool.accept(buffer); - } - } - /** * 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 * diff --git a/src/org/redkale/net/Response.java b/src/org/redkale/net/Response.java index 6f92cee4e..46e2b1088 100644 --- a/src/org/redkale/net/Response.java +++ b/src/org/redkale/net/Response.java @@ -27,18 +27,12 @@ public abstract class Response> { protected final C context; - protected final ObjectPool bufferPool; - protected final ObjectPool responsePool; protected final R request; protected AsyncConnection channel; - protected ByteBuffer writeHeadBuffer; - - protected ByteBuffer writeBodyBuffer; - private volatile boolean inited = true; protected Object output; //输出的结果对象 @@ -49,8 +43,6 @@ public abstract class Response> { protected Servlet> servlet; - private Supplier bodyBufferSupplier; - private final CompletionHandler finishHandler = new CompletionHandler() { @Override @@ -58,31 +50,21 @@ public abstract class Response> { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); } else { - offerResponseBuffer(attachment); + channel.offerBuffer(attachment); + ByteBuffer data = request.removeMoredata(); + final boolean more = data != null && request.keepAlive; + request.more = more; finish(); + if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { - offerResponseBuffer(attachment); + channel.offerBuffer(attachment); finish(true); } - private void offerResponseBuffer(ByteBuffer attachment) { - if (writeHeadBuffer == null) { - if (bufferPool.getRecyclerPredicate().test(attachment)) { - writeHeadBuffer = attachment; - } - } else if (writeBodyBuffer == null) { - if (bufferPool.getRecyclerPredicate().test(attachment)) { - writeBodyBuffer = attachment; - } - } else { - bufferPool.accept(attachment); - } - } - }; private final CompletionHandler finishHandler2 = new CompletionHandler() { @@ -99,73 +81,36 @@ public abstract class Response> { if (index >= 0) { channel.write(attachments, index, attachments.length - index, attachments, this); } else { - offerResponseBuffer(attachments); + for (ByteBuffer attachment : attachments) { + channel.offerBuffer(attachment); + } + ByteBuffer data = request.removeMoredata(); + final boolean more = data != null && request.keepAlive; + request.more = more; finish(); + if (more) new PrepareRunner(context, responsePool, request.channel, null, Response.this).run(); } } @Override public void failed(Throwable exc, final ByteBuffer[] attachments) { - offerResponseBuffer(attachments); + for (ByteBuffer attachment : attachments) { + channel.offerBuffer(attachment); + } finish(true); } - private void offerResponseBuffer(ByteBuffer[] attachments) { - int start = 0; - if (writeHeadBuffer == null && attachments.length > start) { - if (bufferPool.getRecyclerPredicate().test(attachments[start])) { - writeHeadBuffer = attachments[start]; - start++; - } - } - if (writeBodyBuffer == null && attachments.length > start) { - if (bufferPool.getRecyclerPredicate().test(attachments[start])) { - writeBodyBuffer = attachments[start]; - start++; - } - } - for (int i = start; i < attachments.length; i++) { - bufferPool.accept(attachments[i]); - } - } }; protected Response(C context, final R request, ObjectPool responsePool) { this.context = context; this.request = request; - this.bufferPool = request.bufferPool; this.responsePool = responsePool; - this.writeHeadBuffer = bufferPool.get(); - this.writeBodyBuffer = bufferPool.get(); - this.bodyBufferSupplier = () -> { - ByteBuffer buffer = writeBodyBuffer; - if (buffer == null) return bufferPool.get(); - writeBodyBuffer = null; - return buffer; - }; - } - - protected ByteBuffer pollWriteReadBuffer() { - ByteBuffer buffer = this.writeHeadBuffer; - this.writeHeadBuffer = null; - if (buffer == null) buffer = bufferPool.get(); - return buffer; - } - - protected ByteBuffer pollWriteBodyBuffer() { - ByteBuffer buffer = this.writeBodyBuffer; - this.writeBodyBuffer = null; - if (buffer == null) buffer = bufferPool.get(); - return buffer; - } - - protected Supplier getBodyBufferSupplier() { - return bodyBufferSupplier; } protected void offerBuffer(ByteBuffer... buffers) { for (ByteBuffer buffer : buffers) { - bufferPool.accept(buffer); + channel.offerBuffer(buffer); } } @@ -278,7 +223,7 @@ public abstract class Response> { public void finish(final byte[] bs) { if (!this.inited) return; //避免重复关闭 if (this.context.bufferCapacity == bs.length) { - ByteBuffer buffer = this.bufferPool.get(); + ByteBuffer buffer = channel.bufferSupplier.get(); buffer.put(bs); buffer.flip(); this.finish(buffer); @@ -289,33 +234,33 @@ public abstract class Response> { public void finish(ByteBuffer buffer) { if (!this.inited) return; //避免重复关闭 - ByteBuffer data = this.request.removeMoredata(); final AsyncConnection conn = this.channel; - final boolean more = data != null && this.request.keepAlive; - this.request.more = more; +// ByteBuffer data = this.request.removeMoredata(); +// final boolean more = data != null && this.request.keepAlive; +// this.request.more = more; conn.write(buffer, buffer, finishHandler); - if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); +// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(boolean kill, ByteBuffer buffer) { if (!this.inited) return; //避免重复关闭 if (kill) refuseAlive(); - ByteBuffer data = this.request.removeMoredata(); final AsyncConnection conn = this.channel; - final boolean more = data != null && this.request.keepAlive; - this.request.more = more; +// ByteBuffer data = this.request.removeMoredata(); +// final boolean more = data != null && this.request.keepAlive; +// this.request.more = more; conn.write(buffer, buffer, finishHandler); - if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); +// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(ByteBuffer... buffers) { if (!this.inited) return; //避免重复关闭 final AsyncConnection conn = this.channel; - ByteBuffer data = this.request.removeMoredata(); - final boolean more = data != null && this.request.keepAlive; - this.request.more = more; +// ByteBuffer data = this.request.removeMoredata(); +// final boolean more = data != null && this.request.keepAlive; +// this.request.more = more; conn.write(buffers, buffers, finishHandler2); - if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); +// if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(boolean kill, ByteBuffer... buffers) { @@ -337,14 +282,14 @@ public abstract class Response> { if (buffer.hasRemaining()) { channel.write(buffer, attachment, this); } else { - bufferPool.accept(buffer); + channel.offerBuffer(buffer); if (handler != null) handler.completed(result, attachment); } } @Override public void failed(Throwable exc, A attachment) { - bufferPool.accept(buffer); + channel.offerBuffer(buffer); if (handler != null) handler.failed(exc, attachment); } @@ -362,7 +307,7 @@ public abstract class Response> { index = i; break; } - bufferPool.accept(buffers[i]); + channel.offerBuffer(buffers[i]); } if (index == 0) { channel.write(buffers, attachment, this); @@ -376,7 +321,7 @@ public abstract class Response> { @Override public void failed(Throwable exc, A attachment) { for (ByteBuffer buffer : buffers) { - bufferPool.accept(buffer); + channel.offerBuffer(buffer); } if (handler != null) handler.failed(exc, attachment); } diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index f1b05546c..bbc78b839 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -105,6 +105,8 @@ public class HttpResponse extends Response { private static final ZoneId ZONE_GMT = ZoneId.of("GMT"); + private static final byte[] LENG_BYTES = ("Content-Length: \r\n").getBytes(); + private int status = 200; private String contentType = ""; @@ -113,9 +115,15 @@ public class HttpResponse extends Response { private HttpCookie[] cookies; - private boolean headsended = false; + private int headWritedSize = -1; //0表示跳过header,正数表示header的字节长度。 + + private ByteBuffer headBuffer; + + private int headLenPos = -1; private BiFunction bufferHandler; + + private Supplier bodyBufferSupplier; //------------------------------------------------ private final String plainContentType; @@ -163,6 +171,11 @@ public class HttpResponse extends Response { this.hasRender = renders != null && !renders.isEmpty(); this.onlyoneHttpRender = renders != null && renders.size() == 1 ? renders.get(0) : null; this.contentType = this.plainContentType; + this.bodyBufferSupplier = () -> { + if (headWritedSize >= 0 || bufferHandler != null) return channel.pollWriteBuffer(); //bufferHandler 需要cached的请求不能带上header + if (contentLength < 0) contentLength = -2; + return createHeader(); + }; } @Override @@ -185,12 +198,18 @@ public class HttpResponse extends Response { this.contentLength = -1; this.contentType = null; this.cookies = null; - this.headsended = false; + this.headWritedSize = -1; + this.headBuffer = null; + this.headLenPos = -1; this.header.clear(); this.bufferHandler = null; return super.recycle(); } + protected Supplier getBodyBufferSupplier() { + return bodyBufferSupplier; + } + @Override protected void init(AsyncConnection channel) { super.init(channel); @@ -285,15 +304,6 @@ public class HttpResponse extends Response { return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler()); } - /** - * 获取ByteBuffer生成器 - * - * @return ByteBuffer生成器 - */ - public Supplier getBufferSupplier() { - return getBodyBufferSupplier(); - } - /** * 将对象以JSON格式输出 * @@ -637,7 +647,7 @@ public class HttpResponse extends Response { public void finish(final String contentType, final byte[] bs) { if (isClosed()) return; //避免重复关闭 final byte[] content = bs == null ? new byte[0] : bs; - if (!this.headsended) { + if (this.headWritedSize < 0) { this.contentType = contentType; this.contentLength = content.length; ByteBuffer headbuf = createHeader(); @@ -681,7 +691,7 @@ public class HttpResponse extends Response { @Override public void finish(boolean kill, ByteBuffer buffer) { if (isClosed()) return; //避免重复关闭 - if (!this.headsended) { + if (this.headWritedSize < 0) { this.contentLength = buffer == null ? 0 : buffer.remaining(); ByteBuffer headbuf = createHeader(); headbuf.flip(); @@ -719,7 +729,7 @@ public class HttpResponse extends Response { if (bufs != null) buffers = bufs; } if (kill) refuseAlive(); - if (!this.headsended) { + if (this.headWritedSize < 0) { long len = 0; for (ByteBuffer buf : buffers) { len += buf.remaining(); @@ -736,6 +746,17 @@ public class HttpResponse extends Response { super.finish(kill, newbuffers); } } else { + if (this.headLenPos > 0 && buffers[0] == headBuffer) { + long contentlen = -this.headWritedSize; + for (ByteBuffer buf : buffers) { + contentlen += buf.remaining(); + } + byte[] lenBytes = String.valueOf(contentlen).getBytes(); + int start = this.headLenPos - lenBytes.length; + for (int i = 0; i < lenBytes.length; i++) { + headBuffer.put(start + i, lenBytes[i]); + } + } super.finish(kill, buffers); } } @@ -749,7 +770,7 @@ public class HttpResponse extends Response { * @param handler 异步回调函数 */ public void sendBody(ByteBuffer buffer, A attachment, CompletionHandler handler) { - if (!this.headsended) { + if (this.headWritedSize < 0) { if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining(); ByteBuffer headbuf = createHeader(); headbuf.flip(); @@ -772,7 +793,7 @@ public class HttpResponse extends Response { * @param handler 异步回调函数 */ public void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler handler) { - if (!this.headsended) { + if (this.headWritedSize < 0) { if (this.contentLength < 0) { int len = 0; if (buffers != null && buffers.length > 0) { @@ -899,14 +920,19 @@ public class HttpResponse extends Response { //Header大小不能超过一个ByteBuffer的容量 protected ByteBuffer createHeader() { - this.headsended = true; - ByteBuffer buffer = this.pollWriteReadBuffer(); + ByteBuffer buffer = this.channel.pollWriteBuffer(); + int oldpos = buffer.position(); if (this.status == 200) { buffer.put(status200Bytes); } else { buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes()); } - if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes()); + if (this.contentLength >= 0) { + buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes()); + } else if (this.contentLength == -2) { + buffer.put(LENG_BYTES); + this.headLenPos = buffer.position() - 2; //去掉\r\n + } if (!this.request.isWebSocket()) { if (this.contentType == this.jsonContentType) { buffer.put(this.jsonContentTypeBytes); @@ -978,6 +1004,8 @@ public class HttpResponse extends Response { } } buffer.put(LINE); + this.headWritedSize = buffer.position() - oldpos; + this.headBuffer = buffer; return buffer; } @@ -1003,7 +1031,7 @@ public class HttpResponse extends Response { * @return HttpResponse */ public HttpResponse skipHeader() { - this.headsended = true; + this.headWritedSize = 0; return this; } @@ -1210,7 +1238,7 @@ public class HttpResponse extends Response { @Override public void failed(Throwable exc, ByteBuffer attachment) { - bufferPool.accept(attachment); + channel.offerBuffer(attachment); finish(true); try { filechannel.close(); diff --git a/src/org/redkale/net/sncp/SncpResponse.java b/src/org/redkale/net/sncp/SncpResponse.java index f133fd980..9fe7d46c5 100644 --- a/src/org/redkale/net/sncp/SncpResponse.java +++ b/src/org/redkale/net/sncp/SncpResponse.java @@ -59,7 +59,7 @@ public final class SncpResponse extends Response { public void finish(final int retcode, final BsonWriter out) { if (out == null) { - final ByteBuffer buffer = pollWriteReadBuffer(); + final ByteBuffer buffer = channel.pollWriteBuffer(); fillHeader(buffer, 0, retcode); finish(buffer); return; diff --git a/src/org/redkale/util/ObjectPool.java b/src/org/redkale/util/ObjectPool.java index 0d5a22aa4..7219f0bcb 100644 --- a/src/org/redkale/util/ObjectPool.java +++ b/src/org/redkale/util/ObjectPool.java @@ -19,23 +19,25 @@ import java.util.logging.*; * @author zhangjx * @param 对象池元素的数据类型 */ -public final class ObjectPool implements Supplier, Consumer { +public class ObjectPool implements Supplier, Consumer { - private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName()); + protected static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName()); - private final boolean debug; + protected final boolean debug; - private final Queue queue; + protected Creator creator; - private Creator creator; + protected int max; - private final Consumer prepare; + protected final Consumer prepare; - private final Predicate recycler; + protected final Predicate recycler; - private final AtomicLong creatCounter; + protected final AtomicLong creatCounter; - private final AtomicLong cycleCounter; + protected final AtomicLong cycleCounter; + + protected final Queue queue; public ObjectPool(Class clazz, Consumer prepare, Predicate recycler) { this(2, clazz, prepare, recycler); @@ -62,12 +64,18 @@ public final class ObjectPool implements Supplier, Consumer { } public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { + this(creatCounter, cycleCounter, Math.max(Runtime.getRuntime().availableProcessors() * 2, max), + creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max))); + } + + protected ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler, Queue queue) { this.creatCounter = creatCounter; this.cycleCounter = cycleCounter; this.creator = creator; this.prepare = prepare; this.recycler = recycler; - this.queue = new LinkedBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); + this.queue = queue; + this.max = max; this.debug = logger.isLoggable(Level.FINEST); } diff --git a/src/org/redkale/util/ThreadLocalObjectPool.java b/src/org/redkale/util/ThreadLocalObjectPool.java new file mode 100644 index 000000000..6074383ee --- /dev/null +++ b/src/org/redkale/util/ThreadLocalObjectPool.java @@ -0,0 +1,70 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.*; + +/** + * 对象池 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 对象池元素的数据类型 + */ +public class ThreadLocalObjectPool extends ObjectPool { + + public ThreadLocalObjectPool(Class clazz, Consumer prepare, Predicate recycler) { + this(2, clazz, prepare, recycler); + } + + public ThreadLocalObjectPool(int max, Class clazz, Consumer prepare, Predicate recycler) { + this(max, Creator.create(clazz), prepare, recycler); + } + + public ThreadLocalObjectPool(Creator creator, Consumer prepare, Predicate recycler) { + this(2, creator, prepare, recycler); + } + + public ThreadLocalObjectPool(int max, Creator creator, Consumer prepare, Predicate recycler) { + this(null, null, max, creator, prepare, recycler); + } + + public ThreadLocalObjectPool(int max, Supplier creator, Consumer prepare, Predicate recycler) { + this(null, null, max, creator, prepare, recycler); + } + + public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier creator, Consumer prepare, Predicate recycler) { + this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler); + } + + public ThreadLocalObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { + super(creatCounter, cycleCounter, max, creator, prepare, recycler, new LinkedList<>()); + } + + @Override + public T get() { + T result = queue.poll(); + if (result == null) { + if (creatCounter != null) creatCounter.incrementAndGet(); + result = this.creator.create(); + } + if (prepare != null) prepare.accept(result); + return result; + } + + @Override + public void accept(final T e) { + if (e != null && recycler.test(e) && this.queue.size() < this.max) { + if (cycleCounter != null) cycleCounter.incrementAndGet(); + queue.offer(e); + } + } + +}