diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 8806c0653..da1ff1076 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -269,6 +269,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void writeInLock( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + public abstract void writeInLock( + Supplier supplier, A attachment, CompletionHandler handler); + protected void startRead(CompletionHandler handler) { read(handler); } @@ -305,8 +308,16 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void write(ByteTuple array, CompletionHandler handler) { + write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler); + } + + public final void write(ByteBuffer buffer, CompletionHandler handler) { + write(buffer, null, handler); + } + // src写完才会回调 - public final void write(ByteBuffer src, A attachment, CompletionHandler handler) { + final void write(ByteBuffer src, A attachment, CompletionHandler handler) { if (sslEngine == null) { writeImpl(src, attachment, handler); } else { @@ -325,7 +336,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void write( + final void write( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { if (sslEngine == null) { writeImpl(srcs, offset, length, attachment, handler); @@ -345,31 +356,19 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { + final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { write(srcs, 0, srcs.length, attachment, handler); } - public final void write(byte[] bytes, CompletionHandler handler) { + final void write(byte[] bytes, CompletionHandler handler) { write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler); } - public final void write(byte[] bytes, A attachment, CompletionHandler handler) { - write(bytes, 0, bytes.length, (byte[]) null, 0, 0, attachment, handler); - } - - public final void write(byte[] bytes, int offset, int length, CompletionHandler handler) { + final void write(byte[] bytes, int offset, int length, CompletionHandler handler) { write(bytes, offset, length, (byte[]) null, 0, 0, handler); } - public final void write(ByteTuple array, CompletionHandler handler) { - write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler); - } - - public final void write(ByteTuple array, A attachment, CompletionHandler handler) { - write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, attachment, handler); - } - - public final void write(ByteTuple header, ByteTuple body, CompletionHandler handler) { + final void write(ByteTuple header, ByteTuple body, CompletionHandler handler) { write( header.content(), header.offset(), @@ -380,7 +379,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { handler); } - public void write( + void write( byte[] headerContent, int headerOffset, int headerLength, @@ -391,7 +390,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, null, handler); } - public void write( + void write( byte[] headerContent, int headerOffset, int headerLength, diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 5d5c752c0..25e1d4189 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferWriter; @@ -404,6 +405,45 @@ abstract class AsyncNioConnection extends AsyncConnection { } } + @Override + public void writeInLock( + Supplier supplier, A attachment, CompletionHandler handler) { + int total = 0; + Exception t = null; + lockWrite(); + try { + ByteBuffer buffer = supplier.get(); + if (buffer == null || !buffer.hasRemaining()) { + handler.completed(total, attachment); + return; + } + if (this.writePending) { + handler.failed(new WritePendingException(), attachment); + return; + } + this.writePending = true; + while (buffer.hasRemaining()) { // 必须要将buffer写完为止 + int c = implWrite(buffer); + if (c < 0) { + t = new ClosedChannelException(); + total = c; + break; + } + total += c; + } + } catch (Exception e) { + t = e; + } finally { + this.writePending = false; + unlockWrite(); + } + if (t != null) { + handler.failed(t, attachment); + } else { + handler.completed(total, attachment); + } + } + public void doWrite() { try { this.writeTime = System.currentTimeMillis(); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 631fd2017..bbbcc890e 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -43,15 +43,15 @@ public abstract class ClientConnection writeHandler = new CompletionHandler() { + protected final CompletionHandler writeHandler = new CompletionHandler() { @Override - public void completed(Integer result, Object attachment) { + public void completed(Integer result, Void attachment) { // do nothing } @Override - public void failed(Throwable exc, Object attachment) { + public void failed(Throwable exc, Void attachment) { dispose(exc); } }; @@ -181,9 +181,9 @@ public abstract class ClientConnection { private void finishFile(ByteArray headerData, File file, long offset, long length) throws IOException { // this.channel.write(headerData, new TransferFileHandler(file, offset, length)); final Logger logger = context.getLogger(); - this.channel.write(headerData, new CompletionHandler() { + this.channel.writeInIOThread(headerData, new CompletionHandler() { FileChannel fileChannel; @@ -1413,7 +1413,7 @@ public class HttpResponse extends Response { } else { sends += len; } - channel.write(buffer, attachment, this); + channel.writeInIOThread(buffer, attachment, this); } catch (Exception e) { if (fileChannel != null) { try {