diff --git a/src/main/java/org/redkale/annotation/Async.java b/src/main/java/org/redkale/annotation/Async.java
new file mode 100644
index 000000000..e74bbe1ef
--- /dev/null
+++ b/src/main/java/org/redkale/annotation/Async.java
@@ -0,0 +1,23 @@
+/*
+ *
+ */
+package org.redkale.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * 异步模式标记。
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ *
+ * @since 2.8.0
+ */
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Async {
+
+ boolean value() default true;
+}
diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java
index 0f746f958..0255f535d 100644
--- a/src/main/java/org/redkale/net/AsyncConnection.java
+++ b/src/main/java/org/redkale/net/AsyncConnection.java
@@ -58,6 +58,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
private Consumer writeBufferConsumer;
+ private final Object pipelineLock = new Object();
+
private ByteBufferWriter pipelineWriter;
private PipelineDataNode pipelineDataNode;
@@ -494,14 +496,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
- //返回 是否over
+ //返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple array) {
return writePipelineData(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
}
- //返回 是否over
+ //返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
- synchronized (this) {
+ synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
@@ -532,14 +534,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
- //返回 是否over
+ //返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
return writePipelineData(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length());
}
- //返回 是否over
+ //返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) {
- synchronized (this) {
+ synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java
index f9d22c289..1a0e00bf4 100644
--- a/src/main/java/org/redkale/net/AsyncNioConnection.java
+++ b/src/main/java/org/redkale/net/AsyncNioConnection.java
@@ -192,7 +192,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
}
- doWrite(true); //如果不是true,则bodyCallback的执行可能会切换线程
+ doWrite(this.ioWriteThread.inCurrThread()); //如果不是true,则bodyCallback的执行可能会切换线程
}
@Override
@@ -218,7 +218,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
- doWrite(true);
+ doWrite(this.ioWriteThread.inCurrThread());
}
@Override
@@ -246,7 +246,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
- doWrite(true);
+ doWrite(this.ioWriteThread.inCurrThread());
}
public void doRead(boolean direct) {
diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java
index 94c52b1a9..1247945fe 100644
--- a/src/main/java/org/redkale/net/ProtocolCodec.java
+++ b/src/main/java/org/redkale/net/ProtocolCodec.java
@@ -168,7 +168,7 @@ class ProtocolCodec implements CompletionHandler {
}
} else {
request.pipeline(pindex, pindex);
- channel.setReadBuffer((ByteBuffer) buffer.clear());
+ channel.setReadBuffer(buffer.clear());
}
context.executeDispatch(request, response);
if (pipeline) {
diff --git a/src/main/java/org/redkale/net/Request.java b/src/main/java/org/redkale/net/Request.java
index 8cfeed9e4..45bba0f2f 100644
--- a/src/main/java/org/redkale/net/Request.java
+++ b/src/main/java/org/redkale/net/Request.java
@@ -77,6 +77,7 @@ public abstract class Request {
return null;
}
+ //重载此方法,不设置pipelineIndex值可以将协议改成无pipeline模式
protected Request pipeline(int pipelineIndex, int pipelineCount) {
this.pipelineIndex = pipelineIndex;
this.pipelineCount = pipelineCount;
diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java
index aa9544e18..a49e44ca1 100644
--- a/src/main/java/org/redkale/net/Response.java
+++ b/src/main/java/org/redkale/net/Response.java
@@ -10,7 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.function.*;
import java.util.logging.Level;
-import org.redkale.util.ByteTuple;
+import org.redkale.util.*;
/**
* 协议响应对象
@@ -53,12 +53,12 @@ public abstract class Response> {
@Override
public void completed(Integer result, Void attachment) {
- finishInIOThread();
+ completeInIOThread();
}
@Override
public void failed(Throwable exc, Void attachment) {
- finishInIOThread(true);
+ completeInIOThread(true);
}
};
@@ -72,7 +72,7 @@ public abstract class Response> {
} else {
attachment.clear();
}
- finishInIOThread();
+ completeInIOThread();
}
@Override
@@ -82,7 +82,7 @@ public abstract class Response> {
} else {
attachment.clear();
}
- finishInIOThread(true);
+ completeInIOThread(true);
}
};
@@ -96,7 +96,7 @@ public abstract class Response> {
channel.offerWriteBuffer(attachment);
}
}
- finishInIOThread();
+ completeInIOThread();
}
@Override
@@ -106,7 +106,7 @@ public abstract class Response> {
channel.offerWriteBuffer(attachment);
}
}
- finishInIOThread(true);
+ completeInIOThread(true);
}
};
@@ -137,10 +137,10 @@ public abstract class Response> {
this.output = null;
this.filter = null;
this.servlet = null;
- boolean notpipeline = request.pipelineIndex == 0 || request.pipelineCompleted;
+ boolean noPipeline = request.pipelineIndex == 0 || request.pipelineCompleted;
request.recycle();
if (channel != null) {
- if (notpipeline) {
+ if (noPipeline) {
channel.dispose();
}
channel = null;
@@ -201,15 +201,15 @@ public abstract class Response> {
return !this.inited;
}
- private void finishInIOThread() {
- this.finishInIOThread(false);
+ private void completeInIOThread() {
+ this.completeInIOThread(false);
}
protected void error(Throwable t) {
- finishInIOThread(true);
+ completeInIOThread(true);
}
- private void finishInIOThread(boolean kill) {
+ private void completeInIOThread(boolean kill) {
if (!this.inited) {
return; //避免重复关闭
} //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
@@ -261,25 +261,21 @@ public abstract class Response> {
}
public void finish(boolean kill, final byte[] bs, int offset, int length) {
- if (!this.inited) {
- return; //避免重复关闭
- }
if (kill) {
refuseAlive();
}
- if (this.channel.hasPipelineData()) {
- this.channel.flushPipelineData(null, new CompletionHandler() {
-
- @Override
- public void completed(Integer result, Void attachment) {
- channel.write(bs, offset, length, finishBytesHandler);
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- finishBytesHandler.failed(exc, attachment);
- }
- });
+ if (request.pipelineIndex > 0) {
+ boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
+ if (allCompleted) {
+ request.pipelineCompleted = true;
+ this.channel.flushPipelineData(this.finishBytesHandler);
+ } else {
+ removeChannel();
+ this.responseConsumer.accept(this);
+ }
+ } else if (this.channel.hasPipelineData()) {
+ this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
+ this.channel.flushPipelineData(this.finishBytesHandler);
} else {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) {
@@ -293,72 +289,49 @@ public abstract class Response> {
}
}
- public void finish(boolean kill, final byte[] bs, int offset, int length, final byte[] bs2, int offset2, int length2, Consumer callback, A attachment) {
- if (!this.inited) {
- return; //避免重复关闭
- }
+ public void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2, Consumer callback, A attachment) {
if (kill) {
refuseAlive();
}
- if (this.channel.hasPipelineData()) {
- this.channel.flushPipelineData(null, new CompletionHandler() {
-
- @Override
- public void completed(Integer result, Void attachment) {
- channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler);
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- finishBytesHandler.failed(exc, attachment);
- }
- });
+ if (request.pipelineIndex > 0) {
+ boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
+ if (allCompleted) {
+ request.pipelineCompleted = true;
+ this.channel.flushPipelineData(this.finishBytesHandler);
+ } else {
+ removeChannel();
+ this.responseConsumer.accept(this);
+ }
+ } else if (this.channel.hasPipelineData()) {
+ this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
+ this.channel.flushPipelineData(this.finishBytesHandler);
} else {
- this.channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler);
- }
- }
-
- protected final void finishBuffer(ByteBuffer buffer) {
- finishBuffers(false, buffer);
- }
-
- protected final void finishBuffers(ByteBuffer... buffers) {
- finishBuffers(false, buffers);
- }
-
- protected void finishBuffer(boolean kill, ByteBuffer buffer) {
- if (!this.inited) {
- return; //避免重复关闭
- }
- if (kill) {
- refuseAlive();
- }
- if (this.channel.hasPipelineData()) {
- this.channel.flushPipelineData(null, new CompletionHandler() {
-
- @Override
- public void completed(Integer result, Void attachment) {
- channel.write(buffer, buffer, finishBufferHandler);
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- finishBufferHandler.failed(exc, buffer);
- }
- });
- } else {
- this.channel.write(buffer, buffer, finishBufferHandler);
+ this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesHandler);
}
}
protected void finishBuffers(boolean kill, ByteBuffer... buffers) {
- if (!this.inited) {
- return; //避免重复关闭
- }
if (kill) {
refuseAlive();
}
- if (this.channel.hasPipelineData()) {
+ if (request.pipelineIndex > 0) {
+ ByteArray array = new ByteArray();
+ for (ByteBuffer buffer : buffers) {
+ array.put(buffer);
+ }
+ boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, array);
+ if (allCompleted) {
+ request.pipelineCompleted = true;
+ this.channel.flushPipelineData(buffers, this.finishBuffersHandler);
+ } else {
+ AsyncConnection conn = removeChannel();
+ if (conn != null) {
+ conn.offerWriteBuffers(buffers);
+ }
+ this.responseConsumer.accept(this);
+ }
+ } else if (this.channel.hasPipelineData()) {
+ //先将pipeline数据写入完再写入buffers
this.channel.flushPipelineData(null, new CompletionHandler() {
@Override
@@ -376,6 +349,18 @@ public abstract class Response> {
}
}
+ protected final void finishBuffer(ByteBuffer buffer) {
+ finishBuffers(false, buffer);
+ }
+
+ protected final void finishBuffers(ByteBuffer... buffers) {
+ finishBuffers(false, buffers);
+ }
+
+ protected void finishBuffer(boolean kill, ByteBuffer buffer) {
+ finishBuffers(kill, buffer);
+ }
+
protected void send(final ByteTuple array, final CompletionHandler handler) {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= array.length()) {
diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java
index 63f43ade2..0981eb99a 100644
--- a/src/main/java/org/redkale/net/http/HttpRequest.java
+++ b/src/main/java/org/redkale/net/http/HttpRequest.java
@@ -17,7 +17,7 @@ import java.util.logging.Level;
import org.redkale.annotation.Comment;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
-import org.redkale.net.*;
+import org.redkale.net.Request;
import org.redkale.util.*;
/**
@@ -258,10 +258,6 @@ public class HttpRequest extends Request {
return maybews && "Upgrade".equalsIgnoreCase(getHeader("Connection")) && "GET".equalsIgnoreCase(method);
}
- protected void setPipelineCompleted(boolean pipelineCompleted) {
- this.pipelineCompleted = pipelineCompleted;
- }
-
protected void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
@@ -270,18 +266,6 @@ public class HttpRequest extends Request {
return this.keepAlive;
}
- protected AsyncConnection getChannel() {
- return this.channel;
- }
-
- protected int getPipelineIndex() {
- return this.pipelineIndex;
- }
-
- protected int getPipelineCount() {
- return this.pipelineCount;
- }
-
protected ConvertType getRespConvertType() {
return this.respConvertType;
}
diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java
index a8a602d3f..aee800fe3 100644
--- a/src/main/java/org/redkale/net/http/HttpResponse.java
+++ b/src/main/java/org/redkale/net/http/HttpResponse.java
@@ -945,26 +945,8 @@ public class HttpResponse extends Response {
if (cacheHandler != null) {
cacheHandler.accept(this, data.getBytes());
}
-
- int pipelineIndex = request.getPipelineIndex();
- if (pipelineIndex > 0) {
- boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
- if (over) {
- request.setPipelineCompleted(true);
- this.channel.flushPipelineData(this.finishBytesHandler);
- } else {
- removeChannel();
- this.responseConsumer.accept(this);
- }
- } else {
- if (this.channel.hasPipelineData()) {
- this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
- this.channel.flushPipelineData(this.finishBytesHandler);
- } else {
- //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish
- super.finish(false, data.content(), 0, data.length());
- }
- }
+ //不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish
+ super.finish(false, data.content(), 0, data.length());
}
@Override