From 2a7f0901af96eeada1c98558b450553f7ac1e0c0 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 28 Jan 2023 11:28:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/annotation/Async.java | 23 +++ .../java/org/redkale/net/AsyncConnection.java | 14 +- .../org/redkale/net/AsyncNioConnection.java | 6 +- .../java/org/redkale/net/ProtocolCodec.java | 2 +- src/main/java/org/redkale/net/Request.java | 1 + src/main/java/org/redkale/net/Response.java | 153 ++++++++---------- .../org/redkale/net/http/HttpRequest.java | 18 +-- .../org/redkale/net/http/HttpResponse.java | 22 +-- 8 files changed, 108 insertions(+), 131 deletions(-) create mode 100644 src/main/java/org/redkale/annotation/Async.java diff --git a/src/main/java/org/redkale/annotation/Async.java b/src/main/java/org/redkale/annotation/Async.java new file mode 100644 index 000000000..e74bbe1ef --- /dev/null +++ b/src/main/java/org/redkale/annotation/Async.java @@ -0,0 +1,23 @@ +/* + * + */ +package org.redkale.annotation; + +import java.lang.annotation.*; + +/** + * 异步模式标记。 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Async { + + boolean value() default true; +} diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 0f746f958..0255f535d 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -58,6 +58,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl private Consumer writeBufferConsumer; + private final Object pipelineLock = new Object(); + private ByteBufferWriter pipelineWriter; private PipelineDataNode pipelineDataNode; @@ -494,14 +496,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } - //返回 是否over + //返回pipelineCount个数数据是否全部写入完毕 public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple array) { return writePipelineData(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); } - //返回 是否over + //返回pipelineCount个数数据是否全部写入完毕 public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { - synchronized (this) { + synchronized (pipelineLock) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { writer = ByteBufferWriter.create(getWriteBufferSupplier()); @@ -532,14 +534,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } - //返回 是否over + //返回pipelineCount个数数据是否全部写入完毕 public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { return writePipelineData(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length()); } - //返回 是否over + //返回pipelineCount个数数据是否全部写入完毕 public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) { - synchronized (this) { + synchronized (pipelineLock) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { writer = ByteBufferWriter.create(getWriteBufferSupplier()); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index f9d22c289..1a0e00bf4 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -192,7 +192,7 @@ abstract class AsyncNioConnection extends AsyncConnection { newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); this.writeCompletionHandler = newHandler; } - doWrite(true); //如果不是true,则bodyCallback的执行可能会切换线程 + doWrite(this.ioWriteThread.inCurrThread()); //如果不是true,则bodyCallback的执行可能会切换线程 } @Override @@ -218,7 +218,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); + doWrite(this.ioWriteThread.inCurrThread()); } @Override @@ -246,7 +246,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); + doWrite(this.ioWriteThread.inCurrThread()); } public void doRead(boolean direct) { diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 94c52b1a9..1247945fe 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -168,7 +168,7 @@ class ProtocolCodec implements CompletionHandler { } } else { request.pipeline(pindex, pindex); - channel.setReadBuffer((ByteBuffer) buffer.clear()); + channel.setReadBuffer(buffer.clear()); } context.executeDispatch(request, response); if (pipeline) { diff --git a/src/main/java/org/redkale/net/Request.java b/src/main/java/org/redkale/net/Request.java index 8cfeed9e4..45bba0f2f 100644 --- a/src/main/java/org/redkale/net/Request.java +++ b/src/main/java/org/redkale/net/Request.java @@ -77,6 +77,7 @@ public abstract class Request { return null; } + //重载此方法,不设置pipelineIndex值可以将协议改成无pipeline模式 protected Request pipeline(int pipelineIndex, int pipelineCount) { this.pipelineIndex = pipelineIndex; this.pipelineCount = pipelineCount; diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index aa9544e18..a49e44ca1 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -10,7 +10,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.function.*; import java.util.logging.Level; -import org.redkale.util.ByteTuple; +import org.redkale.util.*; /** * 协议响应对象 @@ -53,12 +53,12 @@ public abstract class Response> { @Override public void completed(Integer result, Void attachment) { - finishInIOThread(); + completeInIOThread(); } @Override public void failed(Throwable exc, Void attachment) { - finishInIOThread(true); + completeInIOThread(true); } }; @@ -72,7 +72,7 @@ public abstract class Response> { } else { attachment.clear(); } - finishInIOThread(); + completeInIOThread(); } @Override @@ -82,7 +82,7 @@ public abstract class Response> { } else { attachment.clear(); } - finishInIOThread(true); + completeInIOThread(true); } }; @@ -96,7 +96,7 @@ public abstract class Response> { channel.offerWriteBuffer(attachment); } } - finishInIOThread(); + completeInIOThread(); } @Override @@ -106,7 +106,7 @@ public abstract class Response> { channel.offerWriteBuffer(attachment); } } - finishInIOThread(true); + completeInIOThread(true); } }; @@ -137,10 +137,10 @@ public abstract class Response> { this.output = null; this.filter = null; this.servlet = null; - boolean notpipeline = request.pipelineIndex == 0 || request.pipelineCompleted; + boolean noPipeline = request.pipelineIndex == 0 || request.pipelineCompleted; request.recycle(); if (channel != null) { - if (notpipeline) { + if (noPipeline) { channel.dispose(); } channel = null; @@ -201,15 +201,15 @@ public abstract class Response> { return !this.inited; } - private void finishInIOThread() { - this.finishInIOThread(false); + private void completeInIOThread() { + this.completeInIOThread(false); } protected void error(Throwable t) { - finishInIOThread(true); + completeInIOThread(true); } - private void finishInIOThread(boolean kill) { + private void completeInIOThread(boolean kill) { if (!this.inited) { return; //避免重复关闭 } //System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); @@ -261,25 +261,21 @@ public abstract class Response> { } public void finish(boolean kill, final byte[] bs, int offset, int length) { - if (!this.inited) { - return; //避免重复关闭 - } if (kill) { refuseAlive(); } - if (this.channel.hasPipelineData()) { - this.channel.flushPipelineData(null, new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachment) { - channel.write(bs, offset, length, finishBytesHandler); - } - - @Override - public void failed(Throwable exc, Void attachment) { - finishBytesHandler.failed(exc, attachment); - } - }); + if (request.pipelineIndex > 0) { + boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length); + if (allCompleted) { + request.pipelineCompleted = true; + this.channel.flushPipelineData(this.finishBytesHandler); + } else { + removeChannel(); + this.responseConsumer.accept(this); + } + } else if (this.channel.hasPipelineData()) { + this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length); + this.channel.flushPipelineData(this.finishBytesHandler); } else { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= length) { @@ -293,72 +289,49 @@ public abstract class Response> { } } - public void finish(boolean kill, final byte[] bs, int offset, int length, final byte[] bs2, int offset2, int length2, Consumer callback, A attachment) { - if (!this.inited) { - return; //避免重复关闭 - } + public void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2, Consumer callback, A attachment) { if (kill) { refuseAlive(); } - if (this.channel.hasPipelineData()) { - this.channel.flushPipelineData(null, new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachment) { - channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler); - } - - @Override - public void failed(Throwable exc, Void attachment) { - finishBytesHandler.failed(exc, attachment); - } - }); + if (request.pipelineIndex > 0) { + boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + if (allCompleted) { + request.pipelineCompleted = true; + this.channel.flushPipelineData(this.finishBytesHandler); + } else { + removeChannel(); + this.responseConsumer.accept(this); + } + } else if (this.channel.hasPipelineData()) { + this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + this.channel.flushPipelineData(this.finishBytesHandler); } else { - this.channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler); - } - } - - protected final void finishBuffer(ByteBuffer buffer) { - finishBuffers(false, buffer); - } - - protected final void finishBuffers(ByteBuffer... buffers) { - finishBuffers(false, buffers); - } - - protected void finishBuffer(boolean kill, ByteBuffer buffer) { - if (!this.inited) { - return; //避免重复关闭 - } - if (kill) { - refuseAlive(); - } - if (this.channel.hasPipelineData()) { - this.channel.flushPipelineData(null, new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachment) { - channel.write(buffer, buffer, finishBufferHandler); - } - - @Override - public void failed(Throwable exc, Void attachment) { - finishBufferHandler.failed(exc, buffer); - } - }); - } else { - this.channel.write(buffer, buffer, finishBufferHandler); + this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesHandler); } } protected void finishBuffers(boolean kill, ByteBuffer... buffers) { - if (!this.inited) { - return; //避免重复关闭 - } if (kill) { refuseAlive(); } - if (this.channel.hasPipelineData()) { + if (request.pipelineIndex > 0) { + ByteArray array = new ByteArray(); + for (ByteBuffer buffer : buffers) { + array.put(buffer); + } + boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, array); + if (allCompleted) { + request.pipelineCompleted = true; + this.channel.flushPipelineData(buffers, this.finishBuffersHandler); + } else { + AsyncConnection conn = removeChannel(); + if (conn != null) { + conn.offerWriteBuffers(buffers); + } + this.responseConsumer.accept(this); + } + } else if (this.channel.hasPipelineData()) { + //先将pipeline数据写入完再写入buffers this.channel.flushPipelineData(null, new CompletionHandler() { @Override @@ -376,6 +349,18 @@ public abstract class Response> { } } + protected final void finishBuffer(ByteBuffer buffer) { + finishBuffers(false, buffer); + } + + protected final void finishBuffers(ByteBuffer... buffers) { + finishBuffers(false, buffers); + } + + protected void finishBuffer(boolean kill, ByteBuffer buffer) { + finishBuffers(kill, buffer); + } + protected void send(final ByteTuple array, final CompletionHandler handler) { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= array.length()) { diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 63f43ade2..0981eb99a 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -17,7 +17,7 @@ import java.util.logging.Level; import org.redkale.annotation.Comment; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; -import org.redkale.net.*; +import org.redkale.net.Request; import org.redkale.util.*; /** @@ -258,10 +258,6 @@ public class HttpRequest extends Request { return maybews && "Upgrade".equalsIgnoreCase(getHeader("Connection")) && "GET".equalsIgnoreCase(method); } - protected void setPipelineCompleted(boolean pipelineCompleted) { - this.pipelineCompleted = pipelineCompleted; - } - protected void setKeepAlive(boolean keepAlive) { this.keepAlive = keepAlive; } @@ -270,18 +266,6 @@ public class HttpRequest extends Request { return this.keepAlive; } - protected AsyncConnection getChannel() { - return this.channel; - } - - protected int getPipelineIndex() { - return this.pipelineIndex; - } - - protected int getPipelineCount() { - return this.pipelineCount; - } - protected ConvertType getRespConvertType() { return this.respConvertType; } diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index a8a602d3f..aee800fe3 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -945,26 +945,8 @@ public class HttpResponse extends Response { if (cacheHandler != null) { cacheHandler.accept(this, data.getBytes()); } - - int pipelineIndex = request.getPipelineIndex(); - if (pipelineIndex > 0) { - boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); - if (over) { - request.setPipelineCompleted(true); - this.channel.flushPipelineData(this.finishBytesHandler); - } else { - removeChannel(); - this.responseConsumer.accept(this); - } - } else { - if (this.channel.hasPipelineData()) { - this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data); - this.channel.flushPipelineData(this.finishBytesHandler); - } else { - //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish - super.finish(false, data.content(), 0, data.length()); - } - } + //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish + super.finish(false, data.content(), 0, data.length()); } @Override