优化pipeline

This commit is contained in:
redkale
2023-01-28 11:28:10 +08:00
parent 71fb4102f6
commit 2a7f0901af
8 changed files with 108 additions and 131 deletions

View File

@@ -0,0 +1,23 @@
/*
*
*/
package org.redkale.annotation;
import java.lang.annotation.*;
/**
* 异步模式标记。
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Async {
boolean value() default true;
}

View File

@@ -58,6 +58,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
private Consumer<ByteBuffer> writeBufferConsumer;
private final Object pipelineLock = new Object();
private ByteBufferWriter pipelineWriter;
private PipelineDataNode pipelineDataNode;
@@ -494,14 +496,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
//返回 是否over
//返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple array) {
return writePipelineData(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
}
//返回 是否over
//返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
synchronized (this) {
synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());
@@ -532,14 +534,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
//返回 是否over
//返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
return writePipelineData(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length());
}
//返回 是否over
//返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) {
synchronized (this) {
synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
writer = ByteBufferWriter.create(getWriteBufferSupplier());

View File

@@ -192,7 +192,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
}
doWrite(true); //如果不是true则bodyCallback的执行可能会切换线程
doWrite(this.ioWriteThread.inCurrThread()); //如果不是true则bodyCallback的执行可能会切换线程
}
@Override
@@ -218,7 +218,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true);
doWrite(this.ioWriteThread.inCurrThread());
}
@Override
@@ -246,7 +246,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true);
doWrite(this.ioWriteThread.inCurrThread());
}
public void doRead(boolean direct) {

View File

@@ -168,7 +168,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
}
} else {
request.pipeline(pindex, pindex);
channel.setReadBuffer((ByteBuffer) buffer.clear());
channel.setReadBuffer(buffer.clear());
}
context.executeDispatch(request, response);
if (pipeline) {

View File

@@ -77,6 +77,7 @@ public abstract class Request<C extends Context> {
return null;
}
//重载此方法不设置pipelineIndex值可以将协议改成无pipeline模式
protected Request pipeline(int pipelineIndex, int pipelineCount) {
this.pipelineIndex = pipelineIndex;
this.pipelineCount = pipelineCount;

View File

@@ -10,7 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.util.ByteTuple;
import org.redkale.util.*;
/**
* 协议响应对象
@@ -53,12 +53,12 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, Void attachment) {
finishInIOThread();
completeInIOThread();
}
@Override
public void failed(Throwable exc, Void attachment) {
finishInIOThread(true);
completeInIOThread(true);
}
};
@@ -72,7 +72,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} else {
attachment.clear();
}
finishInIOThread();
completeInIOThread();
}
@Override
@@ -82,7 +82,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} else {
attachment.clear();
}
finishInIOThread(true);
completeInIOThread(true);
}
};
@@ -96,7 +96,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
channel.offerWriteBuffer(attachment);
}
}
finishInIOThread();
completeInIOThread();
}
@Override
@@ -106,7 +106,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
channel.offerWriteBuffer(attachment);
}
}
finishInIOThread(true);
completeInIOThread(true);
}
};
@@ -137,10 +137,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.output = null;
this.filter = null;
this.servlet = null;
boolean notpipeline = request.pipelineIndex == 0 || request.pipelineCompleted;
boolean noPipeline = request.pipelineIndex == 0 || request.pipelineCompleted;
request.recycle();
if (channel != null) {
if (notpipeline) {
if (noPipeline) {
channel.dispose();
}
channel = null;
@@ -201,15 +201,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return !this.inited;
}
private void finishInIOThread() {
this.finishInIOThread(false);
private void completeInIOThread() {
this.completeInIOThread(false);
}
protected void error(Throwable t) {
finishInIOThread(true);
completeInIOThread(true);
}
private void finishInIOThread(boolean kill) {
private void completeInIOThread(boolean kill) {
if (!this.inited) {
return; //避免重复关闭
} //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
@@ -261,25 +261,21 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
public void finish(boolean kill, final byte[] bs, int offset, int length) {
if (!this.inited) {
return; //避免重复关闭
}
if (kill) {
refuseAlive();
}
if (this.channel.hasPipelineData()) {
this.channel.flushPipelineData(null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
channel.write(bs, offset, length, finishBytesHandler);
}
@Override
public void failed(Throwable exc, Void attachment) {
finishBytesHandler.failed(exc, attachment);
}
});
if (request.pipelineIndex > 0) {
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) {
@@ -293,72 +289,49 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
}
public <A> void finish(boolean kill, final byte[] bs, int offset, int length, final byte[] bs2, int offset2, int length2, Consumer<A> callback, A attachment) {
if (!this.inited) {
return; //避免重复关闭
}
public <A> void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2, Consumer<A> callback, A attachment) {
if (kill) {
refuseAlive();
}
if (this.channel.hasPipelineData()) {
this.channel.flushPipelineData(null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler);
}
@Override
public void failed(Throwable exc, Void attachment) {
finishBytesHandler.failed(exc, attachment);
}
});
if (request.pipelineIndex > 0) {
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
this.channel.write(bs, offset, length, bs2, offset2, length2, callback, attachment, finishBytesHandler);
}
}
protected final void finishBuffer(ByteBuffer buffer) {
finishBuffers(false, buffer);
}
protected final void finishBuffers(ByteBuffer... buffers) {
finishBuffers(false, buffers);
}
protected void finishBuffer(boolean kill, ByteBuffer buffer) {
if (!this.inited) {
return; //避免重复关闭
}
if (kill) {
refuseAlive();
}
if (this.channel.hasPipelineData()) {
this.channel.flushPipelineData(null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
channel.write(buffer, buffer, finishBufferHandler);
}
@Override
public void failed(Throwable exc, Void attachment) {
finishBufferHandler.failed(exc, buffer);
}
});
} else {
this.channel.write(buffer, buffer, finishBufferHandler);
this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesHandler);
}
}
protected void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (!this.inited) {
return; //避免重复关闭
}
if (kill) {
refuseAlive();
}
if (this.channel.hasPipelineData()) {
if (request.pipelineIndex > 0) {
ByteArray array = new ByteArray();
for (ByteBuffer buffer : buffers) {
array.put(buffer);
}
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, array);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(buffers, this.finishBuffersHandler);
} else {
AsyncConnection conn = removeChannel();
if (conn != null) {
conn.offerWriteBuffers(buffers);
}
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
//先将pipeline数据写入完再写入buffers
this.channel.flushPipelineData(null, new CompletionHandler<Integer, Void>() {
@Override
@@ -376,6 +349,18 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
}
protected final void finishBuffer(ByteBuffer buffer) {
finishBuffers(false, buffer);
}
protected final void finishBuffers(ByteBuffer... buffers) {
finishBuffers(false, buffers);
}
protected void finishBuffer(boolean kill, ByteBuffer buffer) {
finishBuffers(kill, buffer);
}
protected <A> void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= array.length()) {

View File

@@ -17,7 +17,7 @@ import java.util.logging.Level;
import org.redkale.annotation.Comment;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import org.redkale.net.Request;
import org.redkale.util.*;
/**
@@ -258,10 +258,6 @@ public class HttpRequest extends Request<HttpContext> {
return maybews && "Upgrade".equalsIgnoreCase(getHeader("Connection")) && "GET".equalsIgnoreCase(method);
}
protected void setPipelineCompleted(boolean pipelineCompleted) {
this.pipelineCompleted = pipelineCompleted;
}
protected void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
@@ -270,18 +266,6 @@ public class HttpRequest extends Request<HttpContext> {
return this.keepAlive;
}
protected AsyncConnection getChannel() {
return this.channel;
}
protected int getPipelineIndex() {
return this.pipelineIndex;
}
protected int getPipelineCount() {
return this.pipelineCount;
}
protected ConvertType getRespConvertType() {
return this.respConvertType;
}

View File

@@ -945,26 +945,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (cacheHandler != null) {
cacheHandler.accept(this, data.getBytes());
}
int pipelineIndex = request.getPipelineIndex();
if (pipelineIndex > 0) {
boolean over = this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
if (over) {
request.setPipelineCompleted(true);
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else {
if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(pipelineIndex, request.getPipelineCount(), data);
this.channel.flushPipelineData(this.finishBytesHandler);
} else {
//不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish
super.finish(false, data.content(), 0, data.length());
}
}
//不能用finish(boolean kill, final ByteTuple array) 否则会调this.finish
super.finish(false, data.content(), 0, data.length());
}
@Override