diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 156b08adb..bf50f40e3 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -464,11 +464,11 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl return writer != null && writer.position() > 0; } - public final void flushPipelineData(CompletionHandler handler) { - flushPipelineData(null, handler); + public final void writePipeline(CompletionHandler handler) { + writePipeline(null, handler); } - public void flushPipelineData(A attachment, CompletionHandler handler) { + public void writePipeline(A attachment, CompletionHandler handler) { ByteBufferWriter writer = this.pipelineWriter; this.pipelineWriter = null; if (writer == null) { @@ -496,13 +496,29 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } - //返回pipelineCount个数数据是否全部写入完毕 - public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple array) { - return writePipelineData(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); + public final void writePipelineInIOThread(CompletionHandler handler) { + if (inCurrWriteThread()) { + writePipeline(handler); + } else { + executeWrite(() -> writePipeline(handler)); + } + } + + public final void writePipelineInIOThread(A attachment, CompletionHandler handler) { + if (inCurrWriteThread()) { + writePipeline(attachment, handler); + } else { + executeWrite(() -> writePipeline(attachment, handler)); + } } //返回pipelineCount个数数据是否全部写入完毕 - public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { + public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) { + return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); + } + + //返回pipelineCount个数数据是否全部写入完毕 + public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] bs, int offset, int length) { synchronized (pipelineLock) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { @@ -535,12 +551,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } //返回pipelineCount个数数据是否全部写入完毕 - public final boolean writePipelineData(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { - return writePipelineData(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length()); + public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { + return appendPipeline(pipelineIndex, pipelineCount, header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length()); } //返回pipelineCount个数数据是否全部写入完毕 - public boolean writePipelineData(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) { + public boolean appendPipeline(int pipelineIndex, int pipelineCount, byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength) { synchronized (pipelineLock) { ByteBufferWriter writer = this.pipelineWriter; if (writer == null) { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index a49e44ca1..c3ce05eab 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -49,7 +49,7 @@ public abstract class Response> { private final ByteBuffer writeBuffer; - protected final CompletionHandler finishBytesHandler = new CompletionHandler() { + protected final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { @@ -63,7 +63,7 @@ public abstract class Response> { }; - protected final CompletionHandler finishBufferHandler = new CompletionHandler() { + protected final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { @@ -87,7 +87,7 @@ public abstract class Response> { }; - private final CompletionHandler finishBuffersHandler = new CompletionHandler() { + private final CompletionHandler finishBuffersIOThreadHandler = new CompletionHandler() { @Override public void completed(final Integer result, final ByteBuffer[] attachments) { @@ -265,26 +265,26 @@ public abstract class Response> { refuseAlive(); } if (request.pipelineIndex > 0) { - boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length); + boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); if (allCompleted) { request.pipelineCompleted = true; - this.channel.flushPipelineData(this.finishBytesHandler); + this.channel.writePipeline(this.finishBytesIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { - this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs, offset, length); - this.channel.flushPipelineData(this.finishBytesHandler); + this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); + this.channel.writePipeline(this.finishBytesIOThreadHandler); } else { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= length) { buffer.clear(); buffer.put(bs, offset, length); buffer.flip(); - this.channel.write(buffer, buffer, finishBufferHandler); + this.channel.write(buffer, buffer, finishBufferIOThreadHandler); } else { - this.channel.write(bs, offset, length, finishBytesHandler); + this.channel.write(bs, offset, length, finishBytesIOThreadHandler); } } } @@ -294,19 +294,19 @@ public abstract class Response> { refuseAlive(); } if (request.pipelineIndex > 0) { - boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); if (allCompleted) { request.pipelineCompleted = true; - this.channel.flushPipelineData(this.finishBytesHandler); + this.channel.writePipeline(this.finishBytesIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { - this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - this.channel.flushPipelineData(this.finishBytesHandler); + this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + this.channel.writePipeline(this.finishBytesIOThreadHandler); } else { - this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesHandler); + this.channel.write(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler); } } @@ -319,10 +319,10 @@ public abstract class Response> { for (ByteBuffer buffer : buffers) { array.put(buffer); } - boolean allCompleted = this.channel.writePipelineData(request.pipelineIndex, request.pipelineCount, array); + boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array); if (allCompleted) { request.pipelineCompleted = true; - this.channel.flushPipelineData(buffers, this.finishBuffersHandler); + this.channel.writePipeline(buffers, this.finishBuffersIOThreadHandler); } else { AsyncConnection conn = removeChannel(); if (conn != null) { @@ -332,20 +332,20 @@ public abstract class Response> { } } else if (this.channel.hasPipelineData()) { //先将pipeline数据写入完再写入buffers - this.channel.flushPipelineData(null, new CompletionHandler() { + this.channel.writePipeline(null, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - channel.write(buffers, buffers, finishBuffersHandler); + channel.write(buffers, buffers, finishBuffersIOThreadHandler); } @Override public void failed(Throwable exc, Void attachment) { - finishBuffersHandler.failed(exc, buffers); + finishBuffersIOThreadHandler.failed(exc, buffers); } }); } else { - this.channel.write(buffers, buffers, finishBuffersHandler); + this.channel.write(buffers, buffers, finishBuffersIOThreadHandler); } } diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 364df313b..cc41c794c 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -1338,7 +1338,7 @@ public class HttpResponse extends Response { fileChannel.close(); } catch (IOException ie) { } - finishBytesHandler.completed(result, attachment); + finishBytesIOThreadHandler.completed(result, attachment); return; } if (fileChannel == null) {