writePipeline优化

This commit is contained in:
redkale
2023-01-30 18:50:53 +08:00
parent 737d36be67
commit 4d2458da64
3 changed files with 47 additions and 31 deletions

View File

@@ -464,11 +464,11 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
return writer != null && writer.position() > 0;
}
public final void flushPipelineData(CompletionHandler<Integer, Void> handler) {
flushPipelineData(null, handler);
public final void writePipeline(CompletionHandler<Integer, Void> handler) {
writePipeline(null, handler);
}
public <A> void flushPipelineData(A attachment, CompletionHandler<Integer, ? super A> handler) {
public <A> void writePipeline(A attachment, CompletionHandler<Integer, ? super A> handler) {
ByteBufferWriter writer = this.pipelineWriter;
this.pipelineWriter = null;
if (writer == null) {
@@ -496,13 +496,29 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
//返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple array) {
return writePipelineData(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
public final void writePipelineInIOThread(CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
writePipeline(handler);
} else {
executeWrite(() -> writePipeline(handler));
}
}
public final <A> void writePipelineInIOThread(A attachment, CompletionHandler<Integer, ? super A> handler) {
if (inCurrWriteThread()) {
writePipeline(attachment, handler);
} else {
executeWrite(() -> writePipeline(attachment, handler));
}
}
//返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) {
return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length());
}
//返回pipelineCount个数数据是否全部写入完毕
public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) {
synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {
@@ -535,12 +551,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
//返回pipelineCount个数数据是否全部写入完毕
public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
return writePipelineData(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length());
public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) {
return appendPipeline(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length());
}
//返回pipelineCount个数数据是否全部写入完毕
public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) {
public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) {
synchronized (pipelineLock) {
ByteBufferWriter writer = this.pipelineWriter;
if (writer == null) {

View File

@@ -49,7 +49,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private final ByteBuffer writeBuffer;
protected final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() {
protected final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
@@ -63,7 +63,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
};
protected final CompletionHandler finishBufferHandler = new CompletionHandler<Integer, ByteBuffer>() {
protected final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
@@ -87,7 +87,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
};
private final CompletionHandler finishBuffersHandler = new CompletionHandler<Integer, ByteBuffer[]>() {
private final CompletionHandler finishBuffersIOThreadHandler = new CompletionHandler<Integer, ByteBuffer[]>() {
@Override
public void completed(final Integer result, final ByteBuffer[] attachments) {
@@ -265,26 +265,26 @@ public abstract class Response<C extends Context, R extends Request<C>> {
refuseAlive();
}
if (request.pipelineIndex > 0) {
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(this.finishBytesHandler);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length);
this.channel.flushPipelineData(this.finishBytesHandler);
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
} else {
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) {
buffer.clear();
buffer.put(bs, offset, length);
buffer.flip();
this.channel.write(buffer, buffer, finishBufferHandler);
this.channel.write(buffer, buffer, finishBufferIOThreadHandler);
} else {
this.channel.write(bs, offset, length, finishBytesHandler);
this.channel.write(bs, offset, length, finishBytesIOThreadHandler);
}
}
}
@@ -294,19 +294,19 @@ public abstract class Response<C extends Context, R extends Request<C>> {
refuseAlive();
}
if (request.pipelineIndex > 0) {
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(this.finishBytesHandler);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
} else {
removeChannel();
this.responseConsumer.accept(this);
}
} else if (this.channel.hasPipelineData()) {
this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.flushPipelineData(this.finishBytesHandler);
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.writePipeline(this.finishBytesIOThreadHandler);
} else {
this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesHandler);
this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler);
}
}
@@ -319,10 +319,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
for (ByteBuffer buffer : buffers) {
array.put(buffer);
}
boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, array);
boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array);
if (allCompleted) {
request.pipelineCompleted = true;
this.channel.flushPipelineData(buffers, this.finishBuffersHandler);
this.channel.writePipeline(buffers, this.finishBuffersIOThreadHandler);
} else {
AsyncConnection conn = removeChannel();
if (conn != null) {
@@ -332,20 +332,20 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
} else if (this.channel.hasPipelineData()) {
//先将pipeline数据写入完再写入buffers
this.channel.flushPipelineData(null, new CompletionHandler<Integer, Void>() {
this.channel.writePipeline(null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
channel.write(buffers, buffers, finishBuffersHandler);
channel.write(buffers, buffers, finishBuffersIOThreadHandler);
}
@Override
public void failed(Throwable exc, Void attachment) {
finishBuffersHandler.failed(exc, buffers);
finishBuffersIOThreadHandler.failed(exc, buffers);
}
});
} else {
this.channel.write(buffers, buffers, finishBuffersHandler);
this.channel.write(buffers, buffers, finishBuffersIOThreadHandler);
}
}

View File

@@ -1338,7 +1338,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
fileChannel.close();
} catch (IOException ie) {
}
finishBytesHandler.completed(result, attachment);
finishBytesIOThreadHandler.completed(result, attachment);
return;
}
if (fileChannel == null) {