diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index d7e62fb36..01de1bb25 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -46,17 +46,17 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected final int bufferCapacity; - protected final AsyncIOThread ioReadThread; + protected AsyncIOThread ioReadThread; - protected final AsyncIOThread ioWriteThread; + protected AsyncIOThread ioWriteThread; - private final Supplier readBufferSupplier; + private Supplier readBufferSupplier; - private final Consumer readBufferConsumer; + private Consumer readBufferConsumer; - private final Supplier writeBufferSupplier; + private Supplier writeBufferSupplier; - private final Consumer writeBufferConsumer; + private Consumer writeBufferConsumer; private ByteBufferWriter pipelineWriter; @@ -111,6 +111,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } + void updateReadIOThread(AsyncIOThread ioReadThread) { + Objects.requireNonNull(ioReadThread); + this.ioReadThread = ioReadThread; + this.readBufferSupplier = ioReadThread.getBufferSupplier(); + this.readBufferConsumer = ioReadThread.getBufferConsumer(); + } + + void updateWriteIOThread(AsyncIOThread ioWriteThread) { + Objects.requireNonNull(ioWriteThread); + this.ioWriteThread = ioWriteThread; + this.writeBufferSupplier = ioWriteThread.getBufferSupplier(); + this.writeBufferConsumer = ioWriteThread.getBufferConsumer(); + } + public Supplier getReadBufferSupplier() { return this.readBufferSupplier; } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 9b7096adb..73ce03b56 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -196,30 +196,6 @@ public class AsyncIOGroup extends AsyncGroup { return timeoutExecutor.schedule(callable, delay, unit); } - public void interestOpsOr(AsyncIOThread thread, SelectionKey key, int opt) { - if (key == null) { - return; - } - if (key.selector() != thread.selector) { - throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector"); - } - if ((key.interestOps() & opt) != 0) { - return; - } - key.interestOps(key.interestOps() | opt); - if (thread.inCurrThread()) { -// timeoutExecutor.execute(() -> { -// try { -// key.selector().wakeup(); -// } catch (Throwable t) { -// } -// }); - } else { - //非IO线程中 - key.selector().wakeup(); - } - } - //创建一个AsyncConnection对象,只给测试代码使用 public AsyncConnection newTCPClientConnection() { try { diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 544708bfd..ed95410ff 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -61,6 +61,23 @@ public class AsyncIOThread extends WorkThread { return t instanceof AsyncIOThread ? (AsyncIOThread) t : null; } + public void interestOpsOr(SelectionKey key, int opt) { + if (key == null) { + return; + } + if (key.selector() != selector) { + throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector"); + } + if ((key.interestOps() & opt) != 0) { + return; + } + key.interestOps(key.interestOps() | opt); + //非IO线程中 + if (!inCurrThread()) { + key.selector().wakeup(); + } + } + /** * 是否IO线程 * diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 015f34310..0f1c5266a 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -286,7 +286,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } }); } else { - ioGroup.interestOpsOr(ioReadThread, readKey, SelectionKey.OP_READ); + ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ); } } catch (Exception e) { handleRead(0, e); @@ -397,7 +397,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } }); } else { - ioGroup.interestOpsOr(ioWriteThread, writeKey, SelectionKey.OP_WRITE); + ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); } } catch (IOException e) { handleWrite(0, e); diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index e374616b1..6cf5378e6 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -159,6 +159,14 @@ public class Context { } + protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) { + conn.updateReadIOThread(ioReadThread); + } + + protected void updateWriteIOThread(AsyncConnection conn, AsyncIOThread ioWriteThread) { + conn.updateWriteIOThread(ioWriteThread); + } + public ResourceFactory getResourceFactory() { return resourceFactory; } diff --git a/src/main/java/org/redkale/net/http/HttpContext.java b/src/main/java/org/redkale/net/http/HttpContext.java index de962b97a..253b45dd7 100644 --- a/src/main/java/org/redkale/net/http/HttpContext.java +++ b/src/main/java/org/redkale/net/http/HttpContext.java @@ -11,7 +11,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.redkale.annotation.ConstructorParameters; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; +import org.redkale.net.*; import org.redkale.net.Context; +import org.redkale.net.Context.ContextConfig; import org.redkale.util.*; /** @@ -67,6 +69,16 @@ public class HttpContext extends Context { // this.uriCacheNodes = Utility.append(this.uriCacheNodes, node); // } // } + @Override + protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) { + super.updateReadIOThread(conn, ioReadThread); + } + + @Override + protected void updateWriteIOThread(AsyncConnection conn, AsyncIOThread ioWriteThread) { + super.updateWriteIOThread(conn, ioWriteThread); + } + protected String createSessionid() { byte[] bytes = new byte[16]; random.nextBytes(bytes);