diff --git a/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java b/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java index 6c61d19f5..62e9f0018 100644 --- a/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java +++ b/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java @@ -5,11 +5,8 @@ */ package org.redkale.cluster.spi; -import static org.redkale.util.Utility.isEmpty; - import java.io.Serializable; import java.lang.reflect.Type; -import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.logging.Level; @@ -21,6 +18,7 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; import org.redkale.util.RedkaleException; import org.redkale.util.Traces; +import static org.redkale.util.Utility.isEmpty; /** * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 @@ -318,34 +316,5 @@ public class HttpLocalRpcClient extends HttpRpcClient { byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); future.complete(rs); } - - @Override - public void finishBuffer(boolean kill, ByteBuffer buffer) { - if (future == null) { - return; - } - byte[] bs = new byte[buffer.remaining()]; - buffer.get(bs); - future.complete(bs); - } - - @Override - public void finishBuffers(boolean kill, ByteBuffer... buffers) { - if (future == null) { - return; - } - int size = 0; - for (ByteBuffer buf : buffers) { - size += buf.remaining(); - } - byte[] bs = new byte[size]; - int index = 0; - for (ByteBuffer buf : buffers) { - int r = buf.remaining(); - buf.get(bs, index, r); - index += r; - } - future.complete(bs); - } } } diff --git a/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java b/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java index b828762ec..4f24cd227 100644 --- a/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java @@ -5,15 +5,13 @@ */ package org.redkale.mq.spi; -import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT; - import java.lang.reflect.Type; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.function.*; import java.util.logging.Level; import org.redkale.convert.Convert; +import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT; import org.redkale.net.Response; import org.redkale.net.http.*; import org.redkale.service.RetResult; @@ -238,33 +236,4 @@ public class HttpMessageResponse extends HttpResponse { byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); finishHttpResult(null, new HttpResult(rs).contentType(contentType)); } - - @Override - public void finishBuffer(boolean kill, ByteBuffer buffer) { - if (message.isEmptyRespTopic()) { - return; - } - byte[] bs = new byte[buffer.remaining()]; - buffer.get(bs); - finishHttpResult(null, new HttpResult(bs)); - } - - @Override - public void finishBuffers(boolean kill, ByteBuffer... buffers) { - if (message.isEmptyRespTopic()) { - return; - } - int size = 0; - for (ByteBuffer buf : buffers) { - size += buf.remaining(); - } - byte[] bs = new byte[size]; - int index = 0; - for (ByteBuffer buf : buffers) { - int r = buf.remaining(); - buf.get(bs, index, r); - index += r; - } - finishHttpResult(null, new HttpResult(bs)); - } } diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index da1ff1076..c67e75f7b 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -264,13 +264,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); - public abstract void writeInLock(ByteBuffer src, A attachment, CompletionHandler handler); + public abstract void writeInLock( + ByteBuffer src, Consumer consumer, A attachment, CompletionHandler handler); public abstract void writeInLock( - ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + ByteBuffer[] srcs, + int offset, + int length, + Consumer consumer, + A attachment, + CompletionHandler handler); public abstract void writeInLock( - Supplier supplier, A attachment, CompletionHandler handler); + Supplier supplier, + Consumer consumer, + A attachment, + CompletionHandler handler); protected void startRead(CompletionHandler handler) { read(handler); @@ -292,6 +301,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void readRegisterInIOThread(CompletionHandler handler) { + if (inCurrReadThread()) { + readRegister(handler); + } else { + executeRead(() -> readRegister(handler)); + } + } + public final void read(CompletionHandler handler) { if (sslEngine == null) { readImpl(handler); @@ -520,27 +537,31 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void writeInLock(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { - writeInLock(srcs, 0, srcs.length, attachment, handler); + public final void writeInLock( + ByteBuffer[] srcs, + Consumer consumer, + A attachment, + CompletionHandler handler) { + writeInLock(srcs, 0, srcs.length, consumer, attachment, handler); } public final void writeInLock(byte[] bytes, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(bytes), null, handler); + writeInLock(ByteBuffer.wrap(bytes), (Consumer) null, null, handler); } public final void writeInLock(ByteTuple array, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, handler); + writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, null, handler); } public final void writeInLock(byte[] bytes, int offset, int length, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(bytes, offset, length), null, handler); + writeInLock(ByteBuffer.wrap(bytes, offset, length), null, null, handler); } public final void writeInLock(ByteTuple header, ByteTuple body, CompletionHandler handler) { if (body == null) { - writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, handler); + writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, null, handler); } else if (header == null) { - writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, handler); + writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, null, handler); } else { writeInLock( new ByteBuffer[] { @@ -548,6 +569,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { ByteBuffer.wrap(body.content(), body.offset(), body.length()) }, null, + null, handler); } } @@ -561,9 +583,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { int bodyLength, CompletionHandler handler) { if (bodyContent == null) { - writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, handler); + writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, null, handler); } else if (headerContent == null) { - writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, handler); + writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, null, handler); } else { writeInLock( new ByteBuffer[] { @@ -571,6 +593,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength) }, null, + null, handler); } } @@ -588,33 +611,44 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } public final void writePipeline(CompletionHandler handler) { - writePipeline(null, handler); - } - - public void writePipeline(A attachment, CompletionHandler handler) { ByteBufferWriter writer = this.pipelineWriter; this.pipelineWriter = null; if (writer == null) { - handler.completed(0, attachment); + handler.completed(0, null); } else { ByteBuffer[] srcs = writer.toBuffers(); - CompletionHandler newHandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override - public void completed(Integer result, A attachment) { + public void completed(Integer result, Void attachment) { offerWriteBuffers(srcs); handler.completed(result, attachment); } @Override - public void failed(Throwable exc, A attachment) { + public void failed(Throwable exc, Void attachment) { offerWriteBuffers(srcs); handler.failed(exc, attachment); } }; if (srcs.length == 1) { - write(srcs[0], attachment, newHandler); + write(srcs[0], null, newHandler); } else { - write(srcs, attachment, newHandler); + write(srcs, null, newHandler); + } + } + } + + public void writePipelineInLock(CompletionHandler handler) { + ByteBufferWriter writer = this.pipelineWriter; + this.pipelineWriter = null; + if (writer == null) { + handler.completed(0, null); + } else { + ByteBuffer[] srcs = writer.toBuffers(); + if (srcs.length == 1) { + writeInLock(srcs[0], this.writeBufferConsumer, null, handler); + } else { + writeInLock(srcs, this.writeBufferConsumer, null, handler); } } } @@ -627,14 +661,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void writePipelineInIOThread(A attachment, CompletionHandler handler) { - if (inCurrWriteThread()) { - writePipeline(attachment, handler); - } else { - executeWrite(() -> writePipeline(attachment, handler)); - } - } - // 返回pipelineCount个数数据是否全部写入完毕 public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) { return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); @@ -662,65 +688,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { dataNode.pipelineCount = pipelineCount; } dataNode.put(pipelineIndex, bs, offset, length); - if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { - for (PipelineDataItem item : dataNode.arrayItems()) { - writer.put(item.data); - } - this.pipelineDataNode = null; - return true; - } - return false; - } - } finally { - writeLock.unlock(); - } - } - - // 返回pipelineCount个数数据是否全部写入完毕 - public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { - return appendPipeline( - pipelineIndex, - pipelineCount, - header.content(), - header.offset(), - header.length(), - body == null ? null : body.content(), - body == null ? 0 : body.offset(), - body == null ? 0 : body.length()); - } - - // 返回pipelineCount个数数据是否全部写入完毕 - public boolean appendPipeline( - int pipelineIndex, - int pipelineCount, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - writeLock.lock(); - try { - ByteBufferWriter writer = this.pipelineWriter; - if (writer == null) { - writer = ByteBufferWriter.create(getWriteBufferSupplier()); - this.pipelineWriter = writer; - } - if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { - writer.put(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - return (pipelineIndex == pipelineCount); - } else { - PipelineDataNode dataNode = this.pipelineDataNode; - if (dataNode == null) { - dataNode = new PipelineDataNode(); - this.pipelineDataNode = dataNode; - } - if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值 - dataNode.pipelineCount = pipelineCount; - } - dataNode.put( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { + if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) { for (PipelineDataItem item : dataNode.arrayItems()) { writer.put(item.data); } @@ -738,14 +706,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public int pipelineCount; - public int itemsize; + public int size; private PipelineDataItem head; private PipelineDataItem tail; public PipelineDataItem[] arrayItems() { - PipelineDataItem[] items = new PipelineDataItem[itemsize]; + PipelineDataItem[] items = new PipelineDataItem[size]; PipelineDataItem item = head; int i = 0; while (item != null) { @@ -767,28 +735,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { tail.next = item; tail = item; } - itemsize++; - } - - public void put( - int pipelineIndex, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - if (tail == null) { - head = new PipelineDataItem( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - tail = head; - } else { - PipelineDataItem item = new PipelineDataItem( - pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); - tail.next = item; - tail = item; - } - itemsize++; + size++; } } @@ -802,34 +749,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public PipelineDataItem(int index, byte[] bs, int offset, int length) { this.index = index; - this.data = Arrays.copyOfRange(bs, offset, offset + length); - } - - public PipelineDataItem( - int index, - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - this.index = index; - this.data = bodyLength > 0 - ? copyOfRange(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength) - : Arrays.copyOfRange(headerContent, headerOffset, headerOffset + headerLength); - } - - private static byte[] copyOfRange( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength) { - byte[] result = new byte[headerLength + bodyLength]; - System.arraycopy(headerContent, headerOffset, result, 0, headerLength); - System.arraycopy(bodyContent, bodyOffset, result, headerLength, bodyLength); - return result; + byte[] result = new byte[length]; + System.arraycopy(bs, offset, result, 0, length); + this.data = result; } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 25e1d4189..967c45fac 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -325,7 +325,11 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public void writeInLock(ByteBuffer buffer, A attachment, CompletionHandler handler) { + public void writeInLock( + ByteBuffer buffer, + Consumer consumer, + A attachment, + CompletionHandler handler) { int total = 0; Exception t = null; lockWrite(); @@ -350,6 +354,9 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writePending = false; unlockWrite(); } + if (consumer != null) { + consumer.accept(buffer); + } if (t != null) { handler.failed(t, attachment); } else { @@ -359,7 +366,12 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override public void writeInLock( - ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + ByteBuffer[] srcs, + int offset, + int length, + Consumer consumer, + A attachment, + CompletionHandler handler) { int total = 0; Exception t = null; int batchOffset = offset; @@ -398,6 +410,11 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writePending = false; unlockWrite(); } + if (consumer != null) { + for (int i = 0; i < length; i++) { + consumer.accept(srcs[offset + i]); + } + } if (t != null) { handler.failed(t, attachment); } else { @@ -407,7 +424,10 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override public void writeInLock( - Supplier supplier, A attachment, CompletionHandler handler) { + Supplier supplier, + Consumer consumer, + A attachment, + CompletionHandler handler) { int total = 0; Exception t = null; lockWrite(); @@ -431,6 +451,9 @@ abstract class AsyncNioConnection extends AsyncConnection { } total += c; } + if (consumer != null) { + consumer.accept(buffer); + } } catch (Exception e) { t = e; } finally { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index acfba6f3a..d5581702c 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -61,7 +61,7 @@ public abstract class Response> { private final ByteBuffer writeBuffer; - private final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler() { + private final CompletionHandler finishVoidIOThreadHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { @@ -74,60 +74,6 @@ public abstract class Response> { } }; - private final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - if (attachment != writeBuffer) { - channel.offerWriteBuffer(attachment); - } else { - attachment.clear(); - } - completeInIOThread(false); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - if (attachment != writeBuffer) { - channel.offerWriteBuffer(attachment); - } else { - attachment.clear(); - } - completeInIOThread(true); - } - }; - - private final CompletionHandler finishBuffersIOThreadHandler = new CompletionHandler() { - - @Override - public void completed(final Integer result, final ByteBuffer[] attachments) { - if (attachments != null) { - for (ByteBuffer attachment : attachments) { - if (attachment != writeBuffer) { - channel.offerWriteBuffer(attachment); - } else { - attachment.clear(); - } - } - } - completeInIOThread(false); - } - - @Override - public void failed(Throwable exc, final ByteBuffer[] attachments) { - if (attachments != null) { - for (ByteBuffer attachment : attachments) { - if (attachment != writeBuffer) { - channel.offerWriteBuffer(attachment); - } else { - attachment.clear(); - } - } - } - completeInIOThread(true); - } - }; - protected Response(C context, final R request) { this.context = context; this.request = request; @@ -136,10 +82,6 @@ public abstract class Response> { this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor; } - protected ByteBuffer writeBuffer() { - return writeBuffer; - } - protected AsyncConnection removeChannel() { AsyncConnection ch = this.channel; this.channel = null; @@ -357,27 +299,15 @@ public abstract class Response> { } if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) { AsyncConnection conn = removeChannel(); - if (conn != null && conn.protocolCodec != null) { + if (conn != null) { this.responseConsumer.accept(this); if (!request.readCompleted) { - conn.readRegister(conn.protocolCodec); + conn.readRegisterInIOThread(conn.protocolCodec); } - } else { - Supplier poolSupplier = this.responseSupplier; - Consumer poolConsumer = this.responseConsumer; - this.recycle(); - new ProtocolCodec(context, poolSupplier, poolConsumer, conn) - .response(this) - .run(null); - request.readCompleted = false; + return; } - } else { - this.responseConsumer.accept(this); } - } - - protected void writeInIOThread(ByteBuffer buffer) { - this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); + this.responseConsumer.accept(this); } public final void finish(final byte[] bs) { @@ -409,101 +339,27 @@ public abstract class Response> { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); if (allCompleted) { request.pipelineCompleted = true; - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); + this.channel.writePipelineInLock(this.finishVoidIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); + this.channel.writePipelineInLock(this.finishVoidIOThreadHandler); } else { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= length) { buffer.clear(); buffer.put(bs, offset, length); buffer.flip(); - this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); + this.channel.writeInLock(buffer, (Consumer) null, null, finishVoidIOThreadHandler); } else { - this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler); + this.channel.writeInLock(bs, offset, length, finishVoidIOThreadHandler); } } } - public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) { - if (kill) { - refuseAlive(); - } - if (request.pipelineIndex > 0) { - boolean allCompleted = this.channel.appendPipeline( - request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - if (allCompleted) { - request.pipelineCompleted = true; - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); - } else { - removeChannel(); - this.responseConsumer.accept(this); - } - } else if (this.channel.hasPipelineData()) { - this.channel.appendPipeline( - request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); - this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); - } else { - this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler); - } - } - - protected void finishBuffers(boolean kill, ByteBuffer... buffers) { - if (kill) { - refuseAlive(); - } - if (request.pipelineIndex > 0) { - ByteArray array = new ByteArray(); - for (ByteBuffer buffer : buffers) { - array.put(buffer); - } - boolean allCompleted = this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, array); - if (allCompleted) { - request.pipelineCompleted = true; - this.channel.writeInIOThread(buffers, buffers, this.finishBuffersIOThreadHandler); - } else { - AsyncConnection conn = removeChannel(); - if (conn != null) { - conn.offerWriteBuffers(buffers); - } - this.responseConsumer.accept(this); - } - } else if (this.channel.hasPipelineData()) { - // 先将pipeline数据写入完再写入buffers - this.channel.writePipelineInIOThread(new CompletionHandler() { - - @Override - public void completed(Integer result, Void attachment) { - channel.write(buffers, buffers, finishBuffersIOThreadHandler); - } - - @Override - public void failed(Throwable exc, Void attachment) { - finishBuffersIOThreadHandler.failed(exc, buffers); - } - }); - } else { - this.channel.writeInIOThread(buffers, buffers, finishBuffersIOThreadHandler); - } - } - - protected final void finishBuffer(ByteBuffer buffer) { - finishBuffers(false, buffer); - } - - protected final void finishBuffers(ByteBuffer... buffers) { - finishBuffers(false, buffers); - } - - protected void finishBuffer(boolean kill, ByteBuffer buffer) { - finishBuffers(kill, buffer); - } - protected void send(final ByteTuple array, final CompletionHandler handler) { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= array.length()) {