diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index d2d3ec8c1..275bd1b0f 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -224,7 +224,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); - public abstract void fastWrite(byte[] data, A attachment, CompletionHandler handler); + public abstract AsyncConnection fastHandler(CompletionHandler handler); + + public abstract void fastWrite(byte[] data); protected abstract void readRegisterImpl(CompletionHandler handler); @@ -240,10 +242,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { read(handler); } - public final void fastWrite(byte[] data, CompletionHandler handler) { - fastWrite(data, null, handler); - } - public final void startReadInIOThread(CompletionHandler handler) { if (inCurrReadThread()) { startRead(handler); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index d56eff441..7bf7bdb25 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -85,6 +85,8 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; + protected CompletionHandler writeFastHandler; + public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); @@ -115,6 +117,13 @@ abstract class AsyncNioConnection extends AsyncConnection { return this.writeTimeoutSeconds; } + @Override + public AsyncConnection fastHandler(CompletionHandler handler) { + Objects.requireNonNull(handler); + this.writeFastHandler = (CompletionHandler) handler; + return this; + } + @Override protected void startHandshake(final Consumer callback) { ioReadThread.register(t -> super.startHandshake(callback)); @@ -291,18 +300,19 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public void fastWrite(byte[] data, A attachment, CompletionHandler handler) { + public void fastWrite(byte[] data) { + CompletionHandler handler = this.writeFastHandler; + Objects.requireNonNull(data); + Objects.requireNonNull(handler, "fastHandler is null"); if (!this.isConnected()) { handler.failed(new NotYetConnectedException(), null); return; } - Objects.requireNonNull(data); - Objects.requireNonNull(handler); this.writePending = true; this.fastWriteQueue.offer(data); this.fastWriteCount.incrementAndGet(); this.writeCompletionHandler = (CompletionHandler) handler; - this.writeAttachment = attachment; + this.writeAttachment = null; try { if (writeKey == null) { ioWriteThread.register(selector -> { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 746783ce8..5a3177f3b 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -102,7 +102,7 @@ public abstract class ClientConnection implements Co this.index = index; this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; - this.channel = channel.beforeCloseListener(this); + this.channel = channel.beforeCloseListener(this).fastHandler(writeHandler); this.writeBuffer = channel.pollWriteBuffer(); } @@ -180,7 +180,7 @@ public abstract class ClientConnection implements Co pauseWriting.set(true); currHalfWriteFuture = respFuture; } - channel.fastWrite(array.getBytes(), writeHandler); + channel.fastWrite(array.getBytes()); } else { //旧方式 //发送请求数据包 writeArray.clear(); @@ -221,7 +221,7 @@ public abstract class ClientConnection implements Co } } } - channel.fastWrite(array.getBytes(), writeHandler); + channel.fastWrite(array.getBytes()); } //发送半包和积压的请求数据包