From b40f1678673e2d270a0fdba73cab70274ace89b2 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 9 Nov 2024 07:43:19 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4writeLock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/spi/HttpLocalRpcClient.java | 3 +- .../redkale/mq/spi/HttpMessageResponse.java | 3 +- .../java/org/redkale/net/AsyncConnection.java | 243 ++++++++++-------- .../org/redkale/net/AsyncNioConnection.java | 144 ----------- src/main/java/org/redkale/net/Response.java | 80 +++++- 5 files changed, 204 insertions(+), 269 deletions(-) diff --git a/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java b/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java index 62e9f0018..524c559b2 100644 --- a/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java +++ b/src/main/java/org/redkale/cluster/spi/HttpLocalRpcClient.java @@ -5,6 +5,8 @@ */ package org.redkale.cluster.spi; +import static org.redkale.util.Utility.isEmpty; + import java.io.Serializable; import java.lang.reflect.Type; import java.util.*; @@ -18,7 +20,6 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; import org.redkale.util.RedkaleException; import org.redkale.util.Traces; -import static org.redkale.util.Utility.isEmpty; /** * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 diff --git a/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java b/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java index 4f24cd227..d9dd9f28b 100644 --- a/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/spi/HttpMessageResponse.java @@ -5,13 +5,14 @@ */ package org.redkale.mq.spi; +import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT; + import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.function.*; import java.util.logging.Level; import org.redkale.convert.Convert; -import static org.redkale.mq.spi.MessageRecord.CTYPE_HTTP_RESULT; import org.redkale.net.Response; import org.redkale.net.http.*; import org.redkale.service.RetResult; diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index c67e75f7b..be7b625e3 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -264,23 +264,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); - public abstract void writeInLock( - ByteBuffer src, Consumer consumer, A attachment, CompletionHandler handler); - - public abstract void writeInLock( - ByteBuffer[] srcs, - int offset, - int length, - Consumer consumer, - A attachment, - CompletionHandler handler); - - public abstract void writeInLock( - Supplier supplier, - Consumer consumer, - A attachment, - CompletionHandler handler); - protected void startRead(CompletionHandler handler) { read(handler); } @@ -301,14 +284,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void readRegisterInIOThread(CompletionHandler handler) { - if (inCurrReadThread()) { - readRegister(handler); - } else { - executeRead(() -> readRegister(handler)); - } - } - public final void read(CompletionHandler handler) { if (sslEngine == null) { readImpl(handler); @@ -537,67 +512,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void writeInLock( - ByteBuffer[] srcs, - Consumer consumer, - A attachment, - CompletionHandler handler) { - writeInLock(srcs, 0, srcs.length, consumer, attachment, handler); - } - - public final void writeInLock(byte[] bytes, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(bytes), (Consumer) null, null, handler); - } - - public final void writeInLock(ByteTuple array, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(array.content(), array.offset(), array.length()), null, null, handler); - } - - public final void writeInLock(byte[] bytes, int offset, int length, CompletionHandler handler) { - writeInLock(ByteBuffer.wrap(bytes, offset, length), null, null, handler); - } - - public final void writeInLock(ByteTuple header, ByteTuple body, CompletionHandler handler) { - if (body == null) { - writeInLock(ByteBuffer.wrap(header.content(), header.offset(), header.length()), null, null, handler); - } else if (header == null) { - writeInLock(ByteBuffer.wrap(body.content(), body.offset(), body.length()), null, null, handler); - } else { - writeInLock( - new ByteBuffer[] { - ByteBuffer.wrap(header.content(), header.offset(), header.length()), - ByteBuffer.wrap(body.content(), body.offset(), body.length()) - }, - null, - null, - handler); - } - } - - public final void writeInLock( - byte[] headerContent, - int headerOffset, - int headerLength, - byte[] bodyContent, - int bodyOffset, - int bodyLength, - CompletionHandler handler) { - if (bodyContent == null) { - writeInLock(ByteBuffer.wrap(headerContent, headerOffset, headerLength), null, null, handler); - } else if (headerContent == null) { - writeInLock(ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength), null, null, handler); - } else { - writeInLock( - new ByteBuffer[] { - ByteBuffer.wrap(headerContent, headerOffset, headerLength), - ByteBuffer.wrap(bodyContent, bodyOffset, bodyLength) - }, - null, - null, - handler); - } - } - public void setReadBuffer(ByteBuffer buffer) { if (this.readBuffer != null) { throw new RedkaleException("repeat AsyncConnection.setReadBuffer"); @@ -611,44 +525,33 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } public final void writePipeline(CompletionHandler handler) { + writePipeline(null, handler); + } + + public void writePipeline(A attachment, CompletionHandler handler) { ByteBufferWriter writer = this.pipelineWriter; this.pipelineWriter = null; if (writer == null) { - handler.completed(0, null); + handler.completed(0, attachment); } else { ByteBuffer[] srcs = writer.toBuffers(); - CompletionHandler newHandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override - public void completed(Integer result, Void attachment) { + public void completed(Integer result, A attachment) { offerWriteBuffers(srcs); handler.completed(result, attachment); } @Override - public void failed(Throwable exc, Void attachment) { + public void failed(Throwable exc, A attachment) { offerWriteBuffers(srcs); handler.failed(exc, attachment); } }; if (srcs.length == 1) { - write(srcs[0], null, newHandler); + write(srcs[0], attachment, newHandler); } else { - write(srcs, null, newHandler); - } - } - } - - public void writePipelineInLock(CompletionHandler handler) { - ByteBufferWriter writer = this.pipelineWriter; - this.pipelineWriter = null; - if (writer == null) { - handler.completed(0, null); - } else { - ByteBuffer[] srcs = writer.toBuffers(); - if (srcs.length == 1) { - writeInLock(srcs[0], this.writeBufferConsumer, null, handler); - } else { - writeInLock(srcs, this.writeBufferConsumer, null, handler); + write(srcs, attachment, newHandler); } } } @@ -661,6 +564,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void writePipelineInIOThread(A attachment, CompletionHandler handler) { + if (inCurrWriteThread()) { + writePipeline(attachment, handler); + } else { + executeWrite(() -> writePipeline(attachment, handler)); + } + } + // 返回pipelineCount个数数据是否全部写入完毕 public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple array) { return appendPipeline(pipelineIndex, pipelineCount, array.content(), array.offset(), array.length()); @@ -688,7 +599,65 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { dataNode.pipelineCount = pipelineCount; } dataNode.put(pipelineIndex, bs, offset, length); - if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) { + if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { + for (PipelineDataItem item : dataNode.arrayItems()) { + writer.put(item.data); + } + this.pipelineDataNode = null; + return true; + } + return false; + } + } finally { + writeLock.unlock(); + } + } + + // 返回pipelineCount个数数据是否全部写入完毕 + public final boolean appendPipeline(int pipelineIndex, int pipelineCount, ByteTuple header, ByteTuple body) { + return appendPipeline( + pipelineIndex, + pipelineCount, + header.content(), + header.offset(), + header.length(), + body == null ? null : body.content(), + body == null ? 0 : body.offset(), + body == null ? 0 : body.length()); + } + + // 返回pipelineCount个数数据是否全部写入完毕 + public boolean appendPipeline( + int pipelineIndex, + int pipelineCount, + byte[] headerContent, + int headerOffset, + int headerLength, + byte[] bodyContent, + int bodyOffset, + int bodyLength) { + writeLock.lock(); + try { + ByteBufferWriter writer = this.pipelineWriter; + if (writer == null) { + writer = ByteBufferWriter.create(getWriteBufferSupplier()); + this.pipelineWriter = writer; + } + if (this.pipelineDataNode == null && pipelineIndex == writer.getWriteBytesCounter() + 1) { + writer.put(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); + return (pipelineIndex == pipelineCount); + } else { + PipelineDataNode dataNode = this.pipelineDataNode; + if (dataNode == null) { + dataNode = new PipelineDataNode(); + this.pipelineDataNode = dataNode; + } + if (pipelineIndex == pipelineCount) { // 此时pipelineCount为最大值 + dataNode.pipelineCount = pipelineCount; + } + dataNode.put( + pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); + if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { for (PipelineDataItem item : dataNode.arrayItems()) { writer.put(item.data); } @@ -706,14 +675,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public int pipelineCount; - public int size; + public int itemsize; private PipelineDataItem head; private PipelineDataItem tail; public PipelineDataItem[] arrayItems() { - PipelineDataItem[] items = new PipelineDataItem[size]; + PipelineDataItem[] items = new PipelineDataItem[itemsize]; PipelineDataItem item = head; int i = 0; while (item != null) { @@ -735,7 +704,28 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { tail.next = item; tail = item; } - size++; + itemsize++; + } + + public void put( + int pipelineIndex, + byte[] headerContent, + int headerOffset, + int headerLength, + byte[] bodyContent, + int bodyOffset, + int bodyLength) { + if (tail == null) { + head = new PipelineDataItem( + pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); + tail = head; + } else { + PipelineDataItem item = new PipelineDataItem( + pipelineIndex, headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength); + tail.next = item; + tail = item; + } + itemsize++; } } @@ -749,9 +739,34 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public PipelineDataItem(int index, byte[] bs, int offset, int length) { this.index = index; - byte[] result = new byte[length]; - System.arraycopy(bs, offset, result, 0, length); - this.data = result; + this.data = Arrays.copyOfRange(bs, offset, offset + length); + } + + public PipelineDataItem( + int index, + byte[] headerContent, + int headerOffset, + int headerLength, + byte[] bodyContent, + int bodyOffset, + int bodyLength) { + this.index = index; + this.data = bodyLength > 0 + ? copyOfRange(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength) + : Arrays.copyOfRange(headerContent, headerOffset, headerOffset + headerLength); + } + + private static byte[] copyOfRange( + byte[] headerContent, + int headerOffset, + int headerLength, + byte[] bodyContent, + int bodyOffset, + int bodyLength) { + byte[] result = new byte[headerLength + bodyLength]; + System.arraycopy(headerContent, headerOffset, result, 0, headerLength); + System.arraycopy(bodyContent, bodyOffset, result, headerLength, bodyLength); + return result; } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 967c45fac..8eee4ae69 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -12,7 +12,6 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; -import java.util.function.Supplier; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferWriter; @@ -324,149 +323,6 @@ abstract class AsyncNioConnection extends AsyncConnection { } } - @Override - public void writeInLock( - ByteBuffer buffer, - Consumer consumer, - A attachment, - CompletionHandler handler) { - int total = 0; - Exception t = null; - lockWrite(); - try { - if (this.writePending) { - handler.failed(new WritePendingException(), attachment); - return; - } - this.writePending = true; - while (buffer.hasRemaining()) { // 必须要将buffer写完为止 - int c = implWrite(buffer); - if (c < 0) { - t = new ClosedChannelException(); - total = c; - break; - } - total += c; - } - } catch (Exception e) { - t = e; - } finally { - this.writePending = false; - unlockWrite(); - } - if (consumer != null) { - consumer.accept(buffer); - } - if (t != null) { - handler.failed(t, attachment); - } else { - handler.completed(total, attachment); - } - } - - @Override - public void writeInLock( - ByteBuffer[] srcs, - int offset, - int length, - Consumer consumer, - A attachment, - CompletionHandler handler) { - int total = 0; - Exception t = null; - int batchOffset = offset; - int batchLength = length; - ByteBuffer[] batchBuffers = srcs; - lockWrite(); - try { - if (this.writePending) { - handler.failed(new WritePendingException(), attachment); - return; - } - this.writePending = true; - boolean hasRemain = true; - while (hasRemain) { // 必须要将buffer写完为止 - int c = implWrite(batchBuffers, batchOffset, batchLength); - if (c < 0) { - t = new ClosedChannelException(); - total = c; - break; - } - boolean remain = false; - for (int i = 0; i < batchLength; i++) { - if (batchBuffers[batchOffset + i].hasRemaining()) { - remain = true; - batchOffset += i; - batchLength -= i; - break; - } - } - hasRemain = remain; - total += c; - } - } catch (Exception e) { - t = e; - } finally { - this.writePending = false; - unlockWrite(); - } - if (consumer != null) { - for (int i = 0; i < length; i++) { - consumer.accept(srcs[offset + i]); - } - } - if (t != null) { - handler.failed(t, attachment); - } else { - handler.completed(total, attachment); - } - } - - @Override - public void writeInLock( - Supplier supplier, - Consumer consumer, - A attachment, - CompletionHandler handler) { - int total = 0; - Exception t = null; - lockWrite(); - try { - ByteBuffer buffer = supplier.get(); - if (buffer == null || !buffer.hasRemaining()) { - handler.completed(total, attachment); - return; - } - if (this.writePending) { - handler.failed(new WritePendingException(), attachment); - return; - } - this.writePending = true; - while (buffer.hasRemaining()) { // 必须要将buffer写完为止 - int c = implWrite(buffer); - if (c < 0) { - t = new ClosedChannelException(); - total = c; - break; - } - total += c; - } - if (consumer != null) { - consumer.accept(buffer); - } - } catch (Exception e) { - t = e; - } finally { - this.writePending = false; - unlockWrite(); - } - if (t != null) { - handler.failed(t, attachment); - } else { - handler.completed(total, attachment); - } - } - public void doWrite() { try { this.writeTime = System.currentTimeMillis(); diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index d5581702c..b6f0cb9d8 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -61,7 +61,7 @@ public abstract class Response> { private final ByteBuffer writeBuffer; - private final CompletionHandler finishVoidIOThreadHandler = new CompletionHandler() { + private final CompletionHandler finishBytesIOThreadHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { @@ -74,6 +74,29 @@ public abstract class Response> { } }; + private final CompletionHandler finishBufferIOThreadHandler = new CompletionHandler() { + + @Override + public void completed(Integer result, ByteBuffer attachment) { + if (attachment != writeBuffer) { + channel.offerWriteBuffer(attachment); + } else { + attachment.clear(); + } + completeInIOThread(false); + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + if (attachment != writeBuffer) { + channel.offerWriteBuffer(attachment); + } else { + attachment.clear(); + } + completeInIOThread(true); + } + }; + protected Response(C context, final R request) { this.context = context; this.request = request; @@ -82,6 +105,10 @@ public abstract class Response> { this.workExecutor = context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor; } + protected ByteBuffer writeBuffer() { + return writeBuffer; + } + protected AsyncConnection removeChannel() { AsyncConnection ch = this.channel; this.channel = null; @@ -299,15 +326,27 @@ public abstract class Response> { } if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) { AsyncConnection conn = removeChannel(); - if (conn != null) { + if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); if (!request.readCompleted) { - conn.readRegisterInIOThread(conn.protocolCodec); + conn.readRegister(conn.protocolCodec); } - return; + } else { + Supplier poolSupplier = this.responseSupplier; + Consumer poolConsumer = this.responseConsumer; + this.recycle(); + new ProtocolCodec(context, poolSupplier, poolConsumer, conn) + .response(this) + .run(null); + request.readCompleted = false; } + } else { + this.responseConsumer.accept(this); } - this.responseConsumer.accept(this); + } + + protected void writeInIOThread(ByteBuffer buffer) { + this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); } public final void finish(final byte[] bs) { @@ -339,27 +378,50 @@ public abstract class Response> { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); if (allCompleted) { request.pipelineCompleted = true; - this.channel.writePipelineInLock(this.finishVoidIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { removeChannel(); this.responseConsumer.accept(this); } } else if (this.channel.hasPipelineData()) { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs, offset, length); - this.channel.writePipelineInLock(this.finishVoidIOThreadHandler); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= length) { buffer.clear(); buffer.put(bs, offset, length); buffer.flip(); - this.channel.writeInLock(buffer, (Consumer) null, null, finishVoidIOThreadHandler); + this.channel.writeInIOThread(buffer, buffer, finishBufferIOThreadHandler); } else { - this.channel.writeInLock(bs, offset, length, finishVoidIOThreadHandler); + this.channel.writeInIOThread(bs, offset, length, finishBytesIOThreadHandler); } } } + public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) { + if (kill) { + refuseAlive(); + } + if (request.pipelineIndex > 0) { + boolean allCompleted = this.channel.appendPipeline( + request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + if (allCompleted) { + request.pipelineCompleted = true; + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); + } else { + removeChannel(); + this.responseConsumer.accept(this); + } + } else if (this.channel.hasPipelineData()) { + this.channel.appendPipeline( + request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); + this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); + } else { + this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler); + } + } + protected void send(final ByteTuple array, final CompletionHandler handler) { ByteBuffer buffer = this.writeBuffer; if (buffer != null && buffer.capacity() >= array.length()) {