From 39f743b47d557f8c9fb42e4ab3600677ff6f8884 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 28 Oct 2024 12:18:08 +0800 Subject: [PATCH] writeInLock --- .../java/org/redkale/net/AsyncConnection.java | 10 +++ .../org/redkale/net/AsyncNioConnection.java | 81 +++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 2cb90ccda..b9c1f896b 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -264,6 +264,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + public abstract void writeInLock(ByteBuffer src, A attachment, CompletionHandler handler); + + public abstract void writeInLock( + ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + protected void startRead(CompletionHandler handler) { read(handler); } @@ -508,6 +513,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + // srcs写完才会回调 public final void writeInIOThread(byte[] bytes, CompletionHandler handler) { if (inCurrWriteThread()) { write(bytes, handler); @@ -516,6 +522,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + // srcs写完才会回调 public final void writeInIOThread(ByteTuple array, CompletionHandler handler) { if (inCurrWriteThread()) { write(array, handler); @@ -524,6 +531,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + // srcs写完才会回调 public final void writeInIOThread(byte[] bytes, int offset, int length, CompletionHandler handler) { if (inCurrWriteThread()) { write(bytes, offset, length, handler); @@ -532,6 +540,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + // srcs写完才会回调 public final void writeInIOThread(ByteTuple header, ByteTuple body, CompletionHandler handler) { if (inCurrWriteThread()) { write(header, body, handler); @@ -540,6 +549,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + // srcs写完才会回调 public final void writeInIOThread( byte[] headerContent, int headerOffset, diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 8eee4ae69..5d5c752c0 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -323,6 +323,87 @@ abstract class AsyncNioConnection extends AsyncConnection { } } + @Override + public void writeInLock(ByteBuffer buffer, A attachment, CompletionHandler handler) { + int total = 0; + Exception t = null; + lockWrite(); + try { + if (this.writePending) { + handler.failed(new WritePendingException(), attachment); + return; + } + this.writePending = true; + while (buffer.hasRemaining()) { // 必须要将buffer写完为止 + int c = implWrite(buffer); + if (c < 0) { + t = new ClosedChannelException(); + total = c; + break; + } + total += c; + } + } catch (Exception e) { + t = e; + } finally { + this.writePending = false; + unlockWrite(); + } + if (t != null) { + handler.failed(t, attachment); + } else { + handler.completed(total, attachment); + } + } + + @Override + public void writeInLock( + ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + int total = 0; + Exception t = null; + int batchOffset = offset; + int batchLength = length; + ByteBuffer[] batchBuffers = srcs; + lockWrite(); + try { + if (this.writePending) { + handler.failed(new WritePendingException(), attachment); + return; + } + this.writePending = true; + boolean hasRemain = true; + while (hasRemain) { // 必须要将buffer写完为止 + int c = implWrite(batchBuffers, batchOffset, batchLength); + if (c < 0) { + t = new ClosedChannelException(); + total = c; + break; + } + boolean remain = false; + for (int i = 0; i < batchLength; i++) { + if (batchBuffers[batchOffset + i].hasRemaining()) { + remain = true; + batchOffset += i; + batchLength -= i; + break; + } + } + hasRemain = remain; + total += c; + } + } catch (Exception e) { + t = e; + } finally { + this.writePending = false; + unlockWrite(); + } + if (t != null) { + handler.failed(t, attachment); + } else { + handler.completed(total, attachment); + } + } + public void doWrite() { try { this.writeTime = System.currentTimeMillis();