优化AsyncConnection

This commit is contained in:
Redkale
2023-01-14 18:09:49 +08:00
parent 87f7b82e0a
commit 9192cb4606
6 changed files with 59 additions and 32 deletions

View File

@@ -46,17 +46,17 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
protected final int bufferCapacity;
protected final AsyncIOThread ioReadThread;
protected AsyncIOThread ioReadThread;
protected final AsyncIOThread ioWriteThread;
protected AsyncIOThread ioWriteThread;
private final Supplier<ByteBuffer> readBufferSupplier;
private Supplier<ByteBuffer> readBufferSupplier;
private final Consumer<ByteBuffer> readBufferConsumer;
private Consumer<ByteBuffer> readBufferConsumer;
private final Supplier<ByteBuffer> writeBufferSupplier;
private Supplier<ByteBuffer> writeBufferSupplier;
private final Consumer<ByteBuffer> writeBufferConsumer;
private Consumer<ByteBuffer> writeBufferConsumer;
private ByteBufferWriter pipelineWriter;
@@ -111,6 +111,20 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
void updateReadIOThread(AsyncIOThread ioReadThread) {
Objects.requireNonNull(ioReadThread);
this.ioReadThread = ioReadThread;
this.readBufferSupplier = ioReadThread.getBufferSupplier();
this.readBufferConsumer = ioReadThread.getBufferConsumer();
}
void updateWriteIOThread(AsyncIOThread ioWriteThread) {
Objects.requireNonNull(ioWriteThread);
this.ioWriteThread = ioWriteThread;
this.writeBufferSupplier = ioWriteThread.getBufferSupplier();
this.writeBufferConsumer = ioWriteThread.getBufferConsumer();
}
public Supplier<ByteBuffer> getReadBufferSupplier() {
return this.readBufferSupplier;
}

View File

@@ -196,30 +196,6 @@ public class AsyncIOGroup extends AsyncGroup {
return timeoutExecutor.schedule(callable, delay, unit);
}
public void interestOpsOr(AsyncIOThread thread, SelectionKey key, int opt) {
if (key == null) {
return;
}
if (key.selector() != thread.selector) {
throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector");
}
if ((key.interestOps() & opt) != 0) {
return;
}
key.interestOps(key.interestOps() | opt);
if (thread.inCurrThread()) {
// timeoutExecutor.execute(() -> {
// try {
// key.selector().wakeup();
// } catch (Throwable t) {
// }
// });
} else {
//非IO线程中
key.selector().wakeup();
}
}
//创建一个AsyncConnection对象只给测试代码使用
public AsyncConnection newTCPClientConnection() {
try {

View File

@@ -61,6 +61,23 @@ public class AsyncIOThread extends WorkThread {
return t instanceof AsyncIOThread ? (AsyncIOThread) t : null;
}
public void interestOpsOr(SelectionKey key, int opt) {
if (key == null) {
return;
}
if (key.selector() != selector) {
throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector");
}
if ((key.interestOps() & opt) != 0) {
return;
}
key.interestOps(key.interestOps() | opt);
//非IO线程中
if (!inCurrThread()) {
key.selector().wakeup();
}
}
/**
* 是否IO线程
*

View File

@@ -286,7 +286,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
});
} else {
ioGroup.interestOpsOr(ioReadThread, readKey, SelectionKey.OP_READ);
ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ);
}
} catch (Exception e) {
handleRead(0, e);
@@ -397,7 +397,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
});
} else {
ioGroup.interestOpsOr(ioWriteThread, writeKey, SelectionKey.OP_WRITE);
ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
}
} catch (IOException e) {
handleWrite(0, e);

View File

@@ -159,6 +159,14 @@ public class Context {
}
protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) {
conn.updateReadIOThread(ioReadThread);
}
protected void updateWriteIOThread(AsyncConnection conn, AsyncIOThread ioWriteThread) {
conn.updateWriteIOThread(ioWriteThread);
}
public ResourceFactory getResourceFactory() {
return resourceFactory;
}

View File

@@ -11,7 +11,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.redkale.annotation.ConstructorParameters;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.net.*;
import org.redkale.net.Context;
import org.redkale.net.Context.ContextConfig;
import org.redkale.util.*;
/**
@@ -67,6 +69,16 @@ public class HttpContext extends Context {
// this.uriCacheNodes = Utility.append(this.uriCacheNodes, node);
// }
// }
@Override
protected void updateReadIOThread(AsyncConnection conn, AsyncIOThread ioReadThread) {
super.updateReadIOThread(conn, ioReadThread);
}
@Override
protected void updateWriteIOThread(AsyncConnection conn, AsyncIOThread ioWriteThread) {
super.updateWriteIOThread(conn, ioWriteThread);
}
protected String createSessionid() {
byte[] bytes = new byte[16];
random.nextBytes(bytes);