From 59a4c85aeb46d0e61d59f094688584dd08f5d38b Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 19 Jan 2021 15:39:53 +0800 Subject: [PATCH] --- .../redkale/net/TcpNioAsyncConnection.java | 123 ++++++++---------- src/org/redkale/net/nio/NioThread.java | 1 + src/org/redkale/net/nio/NioThreadGroup.java | 25 ++-- 3 files changed, 71 insertions(+), 78 deletions(-) diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java index 4ca9dff39..9040d42f5 100644 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -35,7 +35,7 @@ public class TcpNioAsyncConnection extends AsyncConnection { private int writeTimeoutSeconds; - private final SocketAddress remoteAddress; + private SocketAddress remoteAddress; final SocketChannel channel; @@ -203,6 +203,20 @@ public class TcpNioAsyncConnection extends AsyncConnection { return this.channel; } + public void connect(SocketAddress remote, A attachment, CompletionHandler handler) { + if (channel.isConnected()) { + throw new AlreadyConnectedException(); + } + if (connectPending) { + throw new ConnectionPendingException(); + } + connectPending = true; + this.connectAttachment = attachment; + this.connectCompletionHandler = (CompletionHandler) handler; + this.remoteAddress = remote; + doConnect(); + } + @Override public void read(CompletionHandler handler) { Objects.requireNonNull(handler); @@ -313,56 +327,40 @@ public class TcpNioAsyncConnection extends AsyncConnection { connected = channel.finishConnect(); } if (connected) { - CompletionHandler handler = this.connectCompletionHandler; - Object attach = this.connectAttachment; - clearConnect(); - if (handler != null) { - if (this.workExecutor == null) { - handler.completed(null, attach); - } else { - this.workExecutor.execute(() -> handler.completed(null, attach)); - } - } + handleConnect(null); } else if (connectKey == null) { ioThread.register(selector -> { try { connectKey = channel.register(selector, SelectionKey.OP_CONNECT); connectKey.attach(this); } catch (ClosedChannelException e) { - CompletionHandler handler = this.connectCompletionHandler; - Object attach = this.connectAttachment; - clearConnect(); - if (handler != null) { - if (this.workExecutor == null) { - handler.failed(e, attach); - } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); - } - } + handleConnect(e); } }); } else { - CompletionHandler handler = this.connectCompletionHandler; - Object attach = this.connectAttachment; - clearConnect(); - if (handler != null) { - IOException e = new IOException(); - if (this.workExecutor == null) { - handler.failed(e, attach); - } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); - } - } + handleConnect(new IOException()); } } catch (IOException e) { - CompletionHandler handler = this.connectCompletionHandler; - Object attach = this.connectAttachment; - clearConnect(); - if (handler != null) { + handleConnect(e); + } + } + + private void handleConnect(Throwable t) { + CompletionHandler handler = this.connectCompletionHandler; + Object attach = this.connectAttachment; + clearConnect(); + if (handler != null) { + if (t == null) { if (this.workExecutor == null) { - handler.failed(e, attach); + handler.completed(null, attach); } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); + this.workExecutor.execute(() -> handler.completed(null, attach)); + } + } else { + if (this.workExecutor == null) { + handler.failed(t, attach); + } else { + this.workExecutor.execute(() -> handler.failed(t, attach)); } } } @@ -395,48 +393,41 @@ public class TcpNioAsyncConnection extends AsyncConnection { totalCount += readCount; } if (totalCount != 0 || !hasRemain) { - if (readKey != null) readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ); - CompletionHandler handler = this.readCompletionHandler; - ByteBuffer attach = this.readByteBuffer; - clearRead(); - if (handler != null) { - if (this.workExecutor == null) { - handler.completed(totalCount, attach); - } else { - final int totalCount0 = totalCount; - this.workExecutor.execute(() -> handler.completed(totalCount0, attach)); - } - } + if (!readPending && readKey != null) readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ); + handleRead(totalCount, null); } else if (readKey == null) { ioThread.register(selector -> { try { readKey = channel.register(selector, SelectionKey.OP_READ); readKey.attach(this); } catch (ClosedChannelException e) { - CompletionHandler handler = this.readCompletionHandler; - ByteBuffer attach = this.readByteBuffer; - clearRead(); - if (handler != null) { - if (this.workExecutor == null) { - handler.failed(e, attach); - } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); - } - } + handleRead(0, e); } }); } else { ioGroup.interestOpsOr(ioThread, readKey, SelectionKey.OP_READ); } } catch (Exception e) { - CompletionHandler handler = this.readCompletionHandler; - ByteBuffer attach = this.readByteBuffer; - clearRead(); - if (handler != null) { + handleRead(0, e); + } + } + + private void handleRead(final int totalCount, Throwable t) { + CompletionHandler handler = this.readCompletionHandler; + ByteBuffer attach = this.readByteBuffer; + clearRead(); + if (handler != null) { + if (t == null) { if (this.workExecutor == null) { - handler.failed(e, attach); + handler.completed(totalCount, attach); } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); + this.workExecutor.execute(() -> handler.completed(totalCount, attach)); + } + } else { + if (this.workExecutor == null) { + handler.failed(t, attach); + } else { + this.workExecutor.execute(() -> handler.failed(t, attach)); } } } diff --git a/src/org/redkale/net/nio/NioThread.java b/src/org/redkale/net/nio/NioThread.java index b53a68f6d..7590c9e76 100644 --- a/src/org/redkale/net/nio/NioThread.java +++ b/src/org/redkale/net/nio/NioThread.java @@ -66,6 +66,7 @@ public class NioThread extends Thread { while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); + if (!key.isValid()) continue; TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); if (key.isWritable()) { //key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); diff --git a/src/org/redkale/net/nio/NioThreadGroup.java b/src/org/redkale/net/nio/NioThreadGroup.java index 9c1985cd9..f23559168 100644 --- a/src/org/redkale/net/nio/NioThreadGroup.java +++ b/src/org/redkale/net/nio/NioThreadGroup.java @@ -24,16 +24,16 @@ import org.redkale.util.ObjectPool; */ public class NioThreadGroup { - private NioThread[] ioThreads; + private NioThread[] threads; private final AtomicInteger index = new AtomicInteger(); private ScheduledThreadPoolExecutor timeoutExecutor; public NioThreadGroup(int threads, ExecutorService executor, ObjectPool bufferPool) throws IOException { - this.ioThreads = new NioThread[Math.max(threads, 1)]; - for (int i = 0; i < ioThreads.length; i++) { - this.ioThreads[i] = new NioThread(Selector.open(), executor, bufferPool); + this.threads = new NioThread[Math.max(threads, 1)]; + for (int i = 0; i < this.threads.length; i++) { + this.threads[i] = new NioThread(Selector.open(), executor, bufferPool); } this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { Thread t = new Thread(r); @@ -44,31 +44,32 @@ public class NioThreadGroup { } public void start() { - for (int i = 0; i < ioThreads.length; i++) { - this.ioThreads[i].start(); + for (NioThread thread : threads) { + thread.start(); } } public void close() { - for (int i = 0; i < ioThreads.length; i++) { - this.ioThreads[i].close(); + for (NioThread thread : threads) { + thread.close(); } - this.timeoutExecutor.shutdownNow(); + this.timeoutExecutor.shutdownNow(); } public NioThread nextThread() { - return ioThreads[Math.abs(index.getAndIncrement()) % ioThreads.length]; + return threads[Math.abs(index.getAndIncrement()) % threads.length]; } public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) { return timeoutExecutor.schedule(callable, delay, unit); } - public void interestOpsOr(NioThread ioThread, SelectionKey key, int opt) { + public void interestOpsOr(NioThread thread, SelectionKey key, int opt) { if (key == null) return; + if (key.selector() != thread.selector) throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector"); if ((key.interestOps() & opt) != 0) return; key.interestOps(key.interestOps() | opt); - if (ioThread.inSameThread()) return; + if (thread.inSameThread()) return; //非IO线程中 key.selector().wakeup(); }