From 6413161dc8b2bb4615153e95aadabe8ae9f2eece Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Fri, 29 Dec 2017 10:19:20 +0800 Subject: [PATCH] =?UTF-8?q?net=E5=8C=85=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/PrepareRunner.java | 13 ++-- src/org/redkale/net/PrepareServlet.java | 8 +-- src/org/redkale/net/Request.java | 20 ++++++ src/org/redkale/net/Response.java | 78 +++++++++++++++++++--- src/org/redkale/net/http/HttpResponse.java | 26 ++++---- src/org/redkale/net/sncp/SncpResponse.java | 2 +- src/org/redkale/util/ObjectPool.java | 4 ++ 7 files changed, 119 insertions(+), 32 deletions(-) diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index 1e42bd2be..a264abdbd 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -48,14 +48,16 @@ public final class PrepareRunner implements Runnable { } return; } - final ByteBuffer buffer = context.pollBuffer(); + final Response response = responsePool.get(); + final ByteBuffer buffer = response.request.pollReadBuffer(); try { channel.read(buffer, null, new CompletionHandler() { @Override public void completed(Integer count, Void attachment1) { if (count < 1 && buffer.remaining() == buffer.limit()) { try { - context.offerBuffer(buffer); + response.request.offerReadBuffer(buffer); + response.finish(true); channel.close(); } catch (Exception e) { context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e); @@ -69,7 +71,6 @@ public final class PrepareRunner implements Runnable { // System.println(new String(bs)); // } buffer.flip(); - final Response response = responsePool.get(); response.init(channel); try { prepare.prepare(buffer, response.request, response); @@ -81,7 +82,8 @@ public final class PrepareRunner implements Runnable { @Override public void failed(Throwable exc, Void attachment2) { - context.offerBuffer(buffer); + response.request.offerReadBuffer(buffer); + response.finish(true); try { channel.close(); } catch (Exception e) { @@ -90,7 +92,8 @@ public final class PrepareRunner implements Runnable { } }); } catch (Exception te) { - context.offerBuffer(buffer); + response.request.offerReadBuffer(buffer); + response.finish(true); try { channel.close(); } catch (Exception e) { diff --git a/src/org/redkale/net/PrepareServlet.java b/src/org/redkale/net/PrepareServlet.java index e3e7a244e..15113b803 100644 --- a/src/org/redkale/net/PrepareServlet.java +++ b/src/org/redkale/net/PrepareServlet.java @@ -214,11 +214,11 @@ public abstract class PrepareServlet { protected AsyncConnection channel; + protected ByteBuffer readBuffer; + /** * properties 与 attributes 的区别在于:调用recycle时, attributes会被清空而properties会保留; * properties 通常存放需要永久绑定在request里的一些对象 @@ -43,10 +45,28 @@ public abstract class Request { protected Request(C context) { this.context = context; + this.readBuffer = context.pollBuffer(); this.bsonConvert = context.getBsonConvert(); this.jsonConvert = context.getJsonConvert(); } + protected ByteBuffer pollReadBuffer() { + ByteBuffer buffer = this.readBuffer; + this.readBuffer = null; + if (buffer == null) buffer = context.pollBuffer(); + return buffer; + } + + protected void offerReadBuffer(ByteBuffer buffer) { + if (buffer == null) return; + if (this.readBuffer == null) { + buffer.clear(); + this.readBuffer = buffer; + } else { + context.offerBuffer(buffer); + } + } + /** * 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 * diff --git a/src/org/redkale/net/Response.java b/src/org/redkale/net/Response.java index 0ff5c26ba..ebc4f0ee7 100644 --- a/src/org/redkale/net/Response.java +++ b/src/org/redkale/net/Response.java @@ -8,7 +8,7 @@ package org.redkale.net; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.util.function.BiConsumer; +import java.util.function.*; import java.util.logging.Level; /** @@ -30,6 +30,10 @@ public abstract class Response> { protected AsyncConnection channel; + protected ByteBuffer writeHeadBuffer; + + protected ByteBuffer writeBodyBuffer; + private boolean inited = true; protected Object output; //输出的结果对象 @@ -40,6 +44,8 @@ public abstract class Response> { protected Servlet> servlet; + private Supplier bodyBufferSupplier; + private final CompletionHandler finishHandler = new CompletionHandler() { @Override @@ -47,17 +53,31 @@ public abstract class Response> { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); } else { - context.offerBuffer(attachment); + offerResponseBuffer(attachment); finish(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { - context.offerBuffer(attachment); + offerResponseBuffer(attachment); finish(true); } + private void offerResponseBuffer(ByteBuffer attachment) { + if (writeHeadBuffer == null) { + if (context.bufferPool.getRecyclerPredicate().test(attachment)) { + writeHeadBuffer = attachment; + } + } else if (writeBodyBuffer == null) { + if (context.bufferPool.getRecyclerPredicate().test(attachment)) { + writeBodyBuffer = attachment; + } + } else { + context.offerBuffer(attachment); + } + } + }; private final CompletionHandler finishHandler2 = new CompletionHandler() { @@ -74,26 +94,66 @@ public abstract class Response> { if (index >= 0) { channel.write(attachments, index, attachments.length - index, attachments, this); } else { - for (ByteBuffer attachment : attachments) { - context.offerBuffer(attachment); - } + offerResponseBuffer(attachments); finish(); } } @Override public void failed(Throwable exc, final ByteBuffer[] attachments) { - for (ByteBuffer attachment : attachments) { - context.offerBuffer(attachment); - } + offerResponseBuffer(attachments); finish(true); } + private void offerResponseBuffer(ByteBuffer[] attachments) { + int start = 0; + if (writeHeadBuffer == null && attachments.length > start) { + if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { + writeHeadBuffer = attachments[start]; + start++; + } + } + if (writeBodyBuffer == null && attachments.length > start) { + if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { + writeBodyBuffer = attachments[start]; + start++; + } + } + for (int i = start; i < attachments.length; i++) { + context.offerBuffer(attachments[i]); + } + } }; protected Response(C context, final R request) { this.context = context; this.request = request; + this.writeHeadBuffer = context.pollBuffer(); + this.writeBodyBuffer = context.pollBuffer(); + this.bodyBufferSupplier = () -> { + ByteBuffer buffer = writeBodyBuffer; + if (buffer == null) return context.pollBuffer(); + writeBodyBuffer = null; + return buffer; + }; + } + + protected ByteBuffer pollWriteReadBuffer() { + ByteBuffer buffer = this.writeHeadBuffer; + this.writeHeadBuffer = null; + if (buffer == null) buffer = context.pollBuffer(); + return buffer; + } + + protected ByteBuffer pollWriteBodyBuffer() { + ByteBuffer buffer = this.writeBodyBuffer; + this.writeBodyBuffer = null; + if (buffer == null) buffer = context.pollBuffer(); + return buffer; + } + + protected Supplier getBodyBufferSupplier() { + return bodyBufferSupplier; } protected AsyncConnection removeChannel() { diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index bf918a452..e5717a22d 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -254,7 +254,7 @@ public class HttpResponse extends Response { public void finishJson(final Object obj) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = obj; - finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), obj)); + finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), obj)); } /** @@ -266,7 +266,7 @@ public class HttpResponse extends Response { public void finishMapJson(final Object... objs) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = objs; - finish(request.getJsonConvert().convertMapTo(context.getBufferSupplier(), objs)); + finish(request.getJsonConvert().convertMapTo(getBodyBufferSupplier(), objs)); } /** @@ -278,7 +278,7 @@ public class HttpResponse extends Response { public void finishJson(final JsonConvert convert, final Object obj) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = obj; - finish(convert.convertTo(context.getBufferSupplier(), obj)); + finish(convert.convertTo(getBodyBufferSupplier(), obj)); } /** @@ -291,7 +291,7 @@ public class HttpResponse extends Response { public void finishMapJson(final JsonConvert convert, final Object... objs) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = objs; - finish(convert.convertMapTo(context.getBufferSupplier(), objs)); + finish(convert.convertMapTo(getBodyBufferSupplier(), objs)); } /** @@ -303,7 +303,7 @@ public class HttpResponse extends Response { public void finishJson(final Type type, final Object obj) { this.contentType = "text/plain; charset=utf-8"; this.output = obj; - finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), type, obj)); + finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), type, obj)); } /** @@ -316,7 +316,7 @@ public class HttpResponse extends Response { public void finishJson(final JsonConvert convert, final Type type, final Object obj) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = obj; - finish(convert.convertTo(context.getBufferSupplier(), type, obj)); + finish(convert.convertTo(getBodyBufferSupplier(), type, obj)); } /** @@ -327,7 +327,7 @@ public class HttpResponse extends Response { public void finishJson(final Object... objs) { this.contentType = "text/plain; charset=utf-8"; if (this.recycleListener != null) this.output = objs; - finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), objs)); + finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), objs)); } /** @@ -342,7 +342,7 @@ public class HttpResponse extends Response { this.header.addValue("retcode", String.valueOf(ret.getRetcode())); this.header.addValue("retinfo", ret.getRetinfo()); } - finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), ret)); + finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), ret)); } /** @@ -358,7 +358,7 @@ public class HttpResponse extends Response { this.header.addValue("retcode", String.valueOf(ret.getRetcode())); this.header.addValue("retinfo", ret.getRetinfo()); } - finish(convert.convertTo(context.getBufferSupplier(), ret)); + finish(convert.convertTo(getBodyBufferSupplier(), ret)); } /** @@ -467,8 +467,8 @@ public class HttpResponse extends Response { this.header.addValue("retcode", String.valueOf(ret.getRetcode())).addValue("retinfo", ret.getRetinfo()); } } - ByteBuffer[] buffers = type == null ? convert.convertTo(context.getBufferSupplier(), obj) - : convert.convertTo(context.getBufferSupplier(), type, obj); + ByteBuffer[] buffers = type == null ? convert.convertTo(getBodyBufferSupplier(), obj) + : convert.convertTo(getBodyBufferSupplier(), type, obj); finish(buffers); } } @@ -550,7 +550,7 @@ public class HttpResponse extends Response { @Override public void finish(final byte[] bs) { if (isClosed()) return; //避免重复关闭 - if (this.context.getBufferCapacity() == bs.length) { + if (this.context.getBufferCapacity() >= bs.length) { ByteBuffer buffer = this.context.pollBuffer(); buffer.put(bs); buffer.flip(); @@ -796,7 +796,7 @@ public class HttpResponse extends Response { //Header大小不能超过一个ByteBuffer的容量 protected ByteBuffer createHeader() { this.headsended = true; - ByteBuffer buffer = this.context.pollBuffer(); + ByteBuffer buffer = this.pollWriteReadBuffer(); buffer.put(("HTTP/1.1 " + this.status + " " + (this.status == 200 ? "OK" : httpCodes.get(this.status)) + "\r\n").getBytes()); buffer.put(("Content-Type: " + (this.contentType == null ? "text/plain; charset=utf-8" : this.contentType) + "\r\n").getBytes()); diff --git a/src/org/redkale/net/sncp/SncpResponse.java b/src/org/redkale/net/sncp/SncpResponse.java index 78d05c597..9227b1ba8 100644 --- a/src/org/redkale/net/sncp/SncpResponse.java +++ b/src/org/redkale/net/sncp/SncpResponse.java @@ -54,7 +54,7 @@ public final class SncpResponse extends Response { public void finish(final int retcode, final BsonWriter out) { if (out == null) { - final ByteBuffer buffer = context.pollBuffer(); + final ByteBuffer buffer = pollWriteReadBuffer(); fillHeader(buffer, 0, retcode); finish(buffer); return; diff --git a/src/org/redkale/util/ObjectPool.java b/src/org/redkale/util/ObjectPool.java index 11d3c0a34..da6d94bd4 100644 --- a/src/org/redkale/util/ObjectPool.java +++ b/src/org/redkale/util/ObjectPool.java @@ -67,6 +67,10 @@ public final class ObjectPool implements Supplier, Consumer { this.creator = creator; } + public Predicate getRecyclerPredicate() { + return recycler; + } + @Override public T get() { T result = queue.poll();