From fbea655e96725fc65bf05c591aa78693f86103eb Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 4 Jul 2023 22:11:38 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 6 +-- .../org/redkale/net/AsyncNioConnection.java | 43 ++++++++++--------- .../redkale/net/AsyncNioTcpConnection.java | 5 +++ .../redkale/net/AsyncNioUdpConnection.java | 5 +++ .../redkale/net/client/ClientConnection.java | 6 +-- 5 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index a560d983a..d2d3ec8c1 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -224,7 +224,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); - public abstract void clientWrite(byte[] data, A attachment, CompletionHandler handler); + public abstract void fastWrite(byte[] data, A attachment, CompletionHandler handler); protected abstract void readRegisterImpl(CompletionHandler handler); @@ -240,8 +240,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { read(handler); } - public final void clientWrite(byte[] data, CompletionHandler handler) { - clientWrite(data, null, handler); + public final void fastWrite(byte[] data, CompletionHandler handler) { + fastWrite(data, null, handler); } public final void startReadInIOThread(CompletionHandler handler) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 5715bd56c..05c5630da 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -153,7 +153,9 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - readKey = implRegister(selector, SelectionKey.OP_READ); + SelectionKey oldKey = keyFor(selector); + int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); + readKey = implRegister(selector, ops); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -287,7 +289,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public void clientWrite(byte[] data, A attachment, CompletionHandler handler) { + public void fastWrite(byte[] data, A attachment, CompletionHandler handler) { if (!this.isConnected()) { handler.failed(new NotYetConnectedException(), null); return; @@ -303,7 +305,9 @@ abstract class AsyncNioConnection extends AsyncConnection { ioWriteThread.register(selector -> { try { if (writeKey == null) { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); + SelectionKey oldKey = keyFor(selector); + int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); + writeKey = implRegister(selector, ops); writeKey.attach(this); } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); @@ -340,7 +344,9 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - readKey = implRegister(selector, SelectionKey.OP_READ); + SelectionKey oldKey = keyFor(selector); + int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); + readKey = implRegister(selector, ops); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -467,27 +473,20 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeCompleted && (totalCount != 0 || !hasRemain)) { handleWrite(writeTotal + totalCount, null); } else if (writeKey == null) { - if (inCurrWriteThread()) { + ioWriteThread.register(selector -> { try { - writeKey = implRegister(ioWriteThread.selector, SelectionKey.OP_WRITE); - writeKey.attach(this); + if (writeKey == null) { + SelectionKey oldKey = keyFor(selector); + int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); + writeKey = implRegister(selector, ops); + writeKey.attach(this); + } else { + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + } } catch (ClosedChannelException e) { handleWrite(0, e); } - } else { - ioWriteThread.register(selector -> { - try { - if (writeKey == null) { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); - writeKey.attach(this); - } else { - writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - } - } catch (ClosedChannelException e) { - handleWrite(0, e); - } - }); - } + }); } else { ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); } @@ -641,6 +640,8 @@ abstract class AsyncNioConnection extends AsyncConnection { }; } + protected abstract SelectionKey keyFor(Selector sel); + protected abstract SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException; protected abstract int implRead(ByteBuffer dst) throws IOException; diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 88c41bfad..ffd969d80 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -203,6 +203,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection { return this.channel.isConnected(); } + @Override + protected SelectionKey keyFor(Selector sel) { + return this.channel.keyFor(sel); + } + @Override protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException { return this.channel.register(sel, ops); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 1e88b3245..5e46e1f30 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -114,6 +114,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection { } } + @Override + protected SelectionKey keyFor(Selector sel) { + return this.channel.keyFor(sel); + } + @Override protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException { return this.channel.register(sel, ops); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 556ffc301..583120b25 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -59,9 +59,7 @@ public abstract class ClientConnection implements Co @Override public void completed(Integer result, ClientConnection attachment) { - if (attachment == null) { //新方式 - channel.readRegister(getCodec()); - } + } @Override @@ -148,7 +146,7 @@ public abstract class ClientConnection implements Co pauseWriting.set(true); currHalfWriteFuture = respFuture; } - channel.clientWrite(array.getBytes(), writeHandler); + channel.fastWrite(array.getBytes(), writeHandler); } else { //旧方式 //发送请求数据包 writeArray.clear();