This commit is contained in:
Redkale
2020-06-25 16:14:57 +08:00
parent 5f3a472c5e
commit 86614f035b
3 changed files with 9 additions and 7 deletions

View File

@@ -28,11 +28,13 @@ class NioThreadGroup {
return timeoutExecutor.schedule(callable, delay, unit); return timeoutExecutor.schedule(callable, delay, unit);
} }
public void interestOps(NioThread ioThread, SelectionKey key, int opt) { public void interestOpsOr(NioThread ioThread, SelectionKey key, int opt) {
if (key == null) return;
if ((key.interestOps() & opt) != 0) return; if ((key.interestOps() & opt) != 0) return;
key.interestOps(key.interestOps() | opt); key.interestOps(key.interestOps() | opt);
if (ioThread.inSameThread()) return; if (ioThread.inSameThread()) return;
//非IO线程中 //非IO线程中
key.selector().wakeup(); key.selector().wakeup();
} }
} }

View File

@@ -325,9 +325,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
this.workExecutor.execute(() -> handler.completed(totalCount0, attach)); this.workExecutor.execute(() -> handler.completed(totalCount0, attach));
} }
} }
if (readKey != null) { if (readKey != null) readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ);
readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ);
}
} else if (readKey == null) { } else if (readKey == null) {
ioThread.register(selector -> { ioThread.register(selector -> {
try { try {
@@ -347,7 +345,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
} }
}); });
} else { } else {
ioGroup.interestOps(ioThread, readKey, SelectionKey.OP_READ); ioGroup.interestOpsOr(ioThread, readKey, SelectionKey.OP_READ);
} }
} catch (Exception e) { } catch (Exception e) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler; CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler;
@@ -428,7 +426,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
} }
}); });
} else { } else {
ioGroup.interestOps(ioThread, writeKey, SelectionKey.OP_WRITE); ioGroup.interestOpsOr(ioThread, writeKey, SelectionKey.OP_WRITE);
} }
} catch (IOException e) { } catch (IOException e) {
CompletionHandler<Integer, Object> handler = this.writeCompletionHandler; CompletionHandler<Integer, Object> handler = this.writeCompletionHandler;

View File

@@ -25,6 +25,8 @@ public class TcpNioProtocolServer extends ProtocolServer {
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private NioThreadGroup ioGroup;
public TcpNioProtocolServer(Context context) { public TcpNioProtocolServer(Context context) {
super(context); super(context);
} }
@@ -60,6 +62,6 @@ public class TcpNioProtocolServer extends ProtocolServer {
} }
void doAccept() { void doAccept() {
} }
} }