From 19c8ffb79d64d37f60ba9389124d94d38835c6b6 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sun, 15 Jan 2023 21:11:59 +0800 Subject: [PATCH] =?UTF-8?q?Response=E4=BC=98=E5=8C=96finish=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redkale/mq/HttpMessageLocalClient.java | 4 +- .../org/redkale/mq/HttpMessageResponse.java | 4 +- .../java/org/redkale/net/AsyncIOThread.java | 2 - .../org/redkale/net/AsyncNioConnection.java | 11 ------ src/main/java/org/redkale/net/Context.java | 6 +-- .../org/redkale/net/DispatcherServlet.java | 2 +- .../java/org/redkale/net/ProtocolCodec.java | 12 +++--- src/main/java/org/redkale/net/Response.java | 38 +++++++++---------- .../org/redkale/net/http/HttpResponse.java | 21 ++-------- .../net/sncp/SncpDispatcherServlet.java | 2 +- .../org/redkale/net/sncp/SncpResponse.java | 4 +- 11 files changed, 40 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index a3aa025a0..90b716a3e 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -323,7 +323,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void finish(boolean kill, ByteBuffer buffer) { + public void finishBuffer(boolean kill, ByteBuffer buffer) { if (future == null) { return; } @@ -333,7 +333,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void finish(boolean kill, ByteBuffer... buffers) { + public void finishBuffers(boolean kill, ByteBuffer... buffers) { if (future == null) { return; } diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index 544361ff9..9737706a0 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -293,7 +293,7 @@ public class HttpMessageResponse extends HttpResponse { } @Override - public void finish(boolean kill, ByteBuffer buffer) { + public void finishBuffer(boolean kill, ByteBuffer buffer) { if (message.isEmptyRespTopic()) { if (callback != null) { callback.run(); @@ -306,7 +306,7 @@ public class HttpMessageResponse extends HttpResponse { } @Override - public void finish(boolean kill, ByteBuffer... buffers) { + public void finishBuffers(boolean kill, ByteBuffer... buffers) { if (message.isEmptyRespTopic()) { if (callback != null) { callback.run(); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 5b6c7cd78..8299cf81a 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -190,7 +190,6 @@ public class AsyncIOThread extends WorkThread { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); conn.doConnect(); } else if (conn.readCompletionHandler != null && key.isReadable()) { - conn.currReadInvoker = 0; key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); conn.doRead(true); } else if (conn.writeCompletionHandler != null && key.isWritable()) { @@ -199,7 +198,6 @@ public class AsyncIOThread extends WorkThread { } } else { if (conn.readCompletionHandler != null && key.isReadable()) { - conn.currReadInvoker = 0; key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //不放开这行,在CompletableFuture时容易ReadPending conn.doRead(true); } else if (conn.writeCompletionHandler != null && key.isWritable()) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 0f1c5266a..f9d22c289 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -26,8 +26,6 @@ import org.redkale.util.ByteBufferWriter; */ abstract class AsyncNioConnection extends AsyncConnection { - protected static final int MAX_INVOKER_ONSTACK = Integer.getInteger("redkale.net.invoker.max.onstack", 16); - final AsyncIOThread connectThread; protected SocketAddress remoteAddress; @@ -46,8 +44,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected int readTimeoutSeconds; - int currReadInvoker; - protected ByteBuffer readByteBuffer; protected CompletionHandler readCompletionHandler; @@ -134,7 +130,6 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override protected void startRead(CompletionHandler handler) { - currReadInvoker = MAX_INVOKER_ONSTACK; read(handler); } @@ -159,11 +154,6 @@ abstract class AsyncNioConnection extends AsyncConnection { this.readCompletionHandler = handler; } doRead(this.ioReadThread.inCurrThread()); -// if (client) { -// doRead(this.ioReadThread.inCurrThread()); -// } else { -// doRead(this.ioReadThread.inCurrThread() || currReadInvoker < MAX_INVOKER_ONSTACK); //同一线程中Selector.wakeup无效 -// } } @Override @@ -264,7 +254,6 @@ abstract class AsyncNioConnection extends AsyncConnection { this.readtime = System.currentTimeMillis(); int readCount = 0; if (direct) { - currReadInvoker++; if (this.readByteBuffer == null) { this.readByteBuffer = sslEngine == null ? pollReadBuffer() : pollReadSSLBuffer(); if (this.readTimeoutSeconds > 0) { diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index 6cf5378e6..b1023eeb9 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -134,7 +134,7 @@ public class Context { } } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.error(); + response.error(t); } }); } else if (workExecutor != null) { @@ -144,7 +144,7 @@ public class Context { servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.error(); + response.error(t); } }); } else { @@ -153,7 +153,7 @@ public class Context { servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.error(); + response.error(t); } } diff --git a/src/main/java/org/redkale/net/DispatcherServlet.java b/src/main/java/org/redkale/net/DispatcherServlet.java index 25687e881..0dcb86829 100644 --- a/src/main/java/org/redkale/net/DispatcherServlet.java +++ b/src/main/java/org/redkale/net/DispatcherServlet.java @@ -254,7 +254,7 @@ public abstract class DispatcherServlet { decode(buffer, response, 0, null); } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); - response.finish(true); + response.error(t); } } @@ -102,7 +102,7 @@ class ProtocolCodec implements CompletionHandler { decode(data, response, 0, null); } catch (Throwable t) { context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); - response.finish(true); + response.error(t); } return; } @@ -123,7 +123,7 @@ class ProtocolCodec implements CompletionHandler { decode(data, response, 0, null); } catch (Throwable t) { context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); - response.finish(true); + response.error(t); } return; } @@ -148,7 +148,7 @@ class ProtocolCodec implements CompletionHandler { if (rs != Integer.MIN_VALUE) { preparer.incrIllegalRequestCounter(); } - response.finish(true); + response.error(null); if (context.logger.isLoggable(Level.FINEST)) { context.logger.log(Level.FINEST, "request.readHeader erroneous (" + rs + "), force to close channel "); } @@ -177,7 +177,7 @@ class ProtocolCodec implements CompletionHandler { decode(buffer, pipelineResponse, pindex + 1, hreq); } catch (Throwable t) { //此处不可 offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "prepare pipeline servlet abort, force to close channel ", t); - pipelineResponse.finish(true); + pipelineResponse.error(t); } } } else { @@ -199,7 +199,7 @@ class ProtocolCodec implements CompletionHandler { public void failed(Throwable exc, ByteBuffer attachment) { context.prepare.incrIllegalRequestCounter(); channel.offerReadBuffer(attachment); - response.finish(true); + response.error(exc); if (exc != null) { request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); } diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index f6e4e33e6..741143997 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -49,21 +49,21 @@ public abstract class Response> { private final ByteBuffer writeBuffer; - private final CompletionHandler finishBytesHandler = new CompletionHandler() { + protected final CompletionHandler finishBytesHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - finish(); + finishInIOThread(); } @Override public void failed(Throwable exc, Void attachment) { - finish(true); + finishInIOThread(true); } }; - private final CompletionHandler finishBufferHandler = new CompletionHandler() { + protected final CompletionHandler finishBufferHandler = new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { @@ -72,7 +72,7 @@ public abstract class Response> { } else { attachment.clear(); } - finish(); + finishInIOThread(); } @Override @@ -82,7 +82,7 @@ public abstract class Response> { } else { attachment.clear(); } - finish(true); + finishInIOThread(true); } }; @@ -96,7 +96,7 @@ public abstract class Response> { channel.offerWriteBuffer(attachment); } } - finish(); + finishInIOThread(); } @Override @@ -106,7 +106,7 @@ public abstract class Response> { channel.offerWriteBuffer(attachment); } } - finish(true); + finishInIOThread(true); } }; @@ -201,15 +201,15 @@ public abstract class Response> { return !this.inited; } - public void finish() { - this.finish(false); + private void finishInIOThread() { + this.finishInIOThread(false); } - protected void error() { - finish(true); + protected void error(Throwable t) { + finishInIOThread(true); } - public void finish(boolean kill) { + private void finishInIOThread(boolean kill) { if (!this.inited) { return; //避免重复关闭 } //System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); @@ -318,15 +318,15 @@ public abstract class Response> { } } - protected final void finish(ByteBuffer buffer) { - finish(false, buffer); + protected final void finishBuffer(ByteBuffer buffer) { + finishBuffers(false, buffer); } - protected final void finish(ByteBuffer... buffers) { - finish(false, buffers); + protected final void finishBuffers(ByteBuffer... buffers) { + finishBuffers(false, buffers); } - protected void finish(boolean kill, ByteBuffer buffer) { + protected void finishBuffer(boolean kill, ByteBuffer buffer) { if (!this.inited) { return; //避免重复关闭 } @@ -351,7 +351,7 @@ public abstract class Response> { } } - protected void finish(boolean kill, ByteBuffer... buffers) { + protected void finishBuffers(boolean kill, ByteBuffer... buffers) { if (!this.inited) { return; //避免重复关闭 } diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 80bd6ae1c..a8a602d3f 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -177,19 +177,6 @@ public class HttpResponse extends Response { private final JsonBytesWriter jsonWriter = new JsonBytesWriter(); - protected final CompletionHandler pipelineWriteHandler = new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachment) { - finish(); - } - - @Override - public void failed(Throwable exc, Void attachment) { - finish(true); - } - }; - @SuppressWarnings("Convert2Lambda") protected final ConvertBytesHandler convertHandler = new ConvertBytesHandler() { @Override @@ -964,7 +951,7 @@ public class HttpResponse extends Response { boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); if (over) { request.setPipelineCompleted(true); - this.channel.flushPipelineData(this.pipelineWriteHandler); + this.channel.flushPipelineData(this.finishBytesHandler); } else { removeChannel(); this.responseConsumer.accept(this); @@ -972,7 +959,7 @@ public class HttpResponse extends Response { } else { if (this.channel.hasPipelineData()) { this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); - this.channel.flushPipelineData(this.pipelineWriteHandler); + this.channel.flushPipelineData(this.finishBytesHandler); } else { //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish super.finish(false, data.content(), 0, data.length()); @@ -981,7 +968,7 @@ public class HttpResponse extends Response { } @Override - protected void error() { + protected void error(Throwable t) { finish500(); } @@ -1337,7 +1324,7 @@ public class HttpResponse extends Response { fileChannel.close(); } catch (IOException ie) { } - finish(); + finishBytesHandler.completed(result, attachment); return; } if (fileChannel == null) { diff --git a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java index 4e22d6b50..9355312b4 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java @@ -70,7 +70,7 @@ public class SncpDispatcherServlet extends DispatcherServlet { } @Override - protected void finish(boolean kill, ByteBuffer buffer) { - super.finish(kill, buffer); + protected void finishBuffer(boolean kill, ByteBuffer buffer) { + super.finishBuffer(kill, buffer); } public void finish(final int retcode, final BsonWriter out) {