This commit is contained in:
Redkale
2021-01-20 19:13:38 +08:00
parent 34156fc092
commit 58e23c1b8c
4 changed files with 47 additions and 20 deletions

View File

@@ -75,7 +75,7 @@ public class NioTcpAsyncConnection extends AsyncConnection {
public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, SocketChannel ch, public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, SocketChannel ch,
SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) {
super(ioThread.getBufferPool(), sslContext, livingCounter, closedCounter); super(ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup; this.ioGroup = ioGroup;
this.ioThread = ioThread; this.ioThread = ioThread;
this.channel = ch; this.channel = ch;

View File

@@ -42,7 +42,7 @@ public class NioTcpPrepareRunner implements Runnable {
channel.read(new CompletionHandler<Integer, ByteBuffer>() { channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer count, ByteBuffer buffer) { public void completed(Integer count, ByteBuffer buffer) {
if (response == null) response = ((NioThread) Thread.currentThread()).getResponsePool().get(); if (response == null) response = ((NioThread) Thread.currentThread()).getResponseSupplier().get();
if (count < 1) { if (count < 1) {
buffer.clear(); buffer.clear();
channel.setReadBuffer(buffer); channel.setReadBuffer(buffer);

View File

@@ -9,7 +9,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Consumer; import java.util.function.*;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
@@ -26,21 +26,40 @@ public class NioThread extends WorkThread {
final Selector selector; final Selector selector;
private final ObjectPool<ByteBuffer> bufferPool; private final Supplier<ByteBuffer> bufferSupplier;
private final ObjectPool<Response> responsePool; private final Consumer<ByteBuffer> bufferConsumer;
private final Supplier<Response> responseSupplier;
private final Consumer<Response> responseConsumer;
private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>();
private boolean closed; private boolean closed;
public NioThread(String name, ExecutorService workExecutor, Selector selector, public NioThread(String name, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) { ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool,
ObjectPool<Response> unsafeResponsePool, ObjectPool<Response> safeResponsePool) {
super(name, workExecutor, null); super(name, workExecutor, null);
this.selector = selector; this.selector = selector;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.setDaemon(true); this.setDaemon(true);
this.bufferSupplier = () -> inCurrThread() ? unsafeBufferPool.get() : safeBufferPool.get();
this.bufferConsumer = (v) -> {
if (inCurrThread()) {
unsafeBufferPool.accept(v);
} else {
safeBufferPool.accept(v);
}
};
this.responseSupplier = () -> inCurrThread() ? unsafeResponsePool.get() : safeResponsePool.get();
this.responseConsumer = (v) -> {
if (inCurrThread()) {
unsafeResponsePool.accept(v);
} else {
safeResponsePool.accept(v);
}
};
} }
public void register(Consumer<Selector> consumer) { public void register(Consumer<Selector> consumer) {
@@ -48,12 +67,20 @@ public class NioThread extends WorkThread {
selector.wakeup(); selector.wakeup();
} }
public ObjectPool<ByteBuffer> getBufferPool() { public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool; return bufferSupplier;
} }
public ObjectPool<Response> getResponsePool() { public Consumer<ByteBuffer> getBufferConsumer() {
return responsePool; return bufferConsumer;
}
public Supplier<Response> getResponseSupplier() {
return responseSupplier;
}
public Consumer<Response> getResponseConsumer() {
return responseConsumer;
} }
@Override @Override

View File

@@ -32,18 +32,18 @@ public class NioThreadGroup {
private ScheduledThreadPoolExecutor timeoutExecutor; private ScheduledThreadPoolExecutor timeoutExecutor;
public NioThreadGroup(final String serverName, ExecutorService workExecutor, int iothreads, public NioThreadGroup(final String serverName, ExecutorService workExecutor, int iothreads,
ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) throws IOException { ObjectPool<ByteBuffer> safeBufferPool, ObjectPool<Response> safeResponsePool) throws IOException {
this.threads = new NioThread[Math.max(iothreads, 1)]; this.threads = new NioThread[Math.max(iothreads, 1)];
for (int i = 0; i < this.threads.length; i++) { for (int i = 0; i < this.threads.length; i++) {
ObjectPool<ByteBuffer> threadBufferPool = ObjectPool.createUnsafePool(bufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool.getCreatCounter(),
bufferPool.getCycleCounter(), 8, safeBufferPool.getCycleCounter(), 8,
bufferPool.getCreator(), bufferPool.getPrepare(), bufferPool.getRecycler()); safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
ObjectPool<Response> threadResponsePool = ObjectPool.createUnsafePool(responsePool.getCreatCounter(), ObjectPool<Response> unsafeResponsePool = ObjectPool.createUnsafePool(safeResponsePool.getCreatCounter(),
responsePool.getCycleCounter(), 8, safeResponsePool.getCycleCounter(), 8,
responsePool.getCreator(), responsePool.getPrepare(), responsePool.getRecycler()); safeResponsePool.getCreator(), safeResponsePool.getPrepare(), safeResponsePool.getRecycler());
String name = "Redkale-" + serverName + "-ServletThread" + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); String name = "Redkale-" + serverName + "-ServletThread" + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
this.threads[i] = new NioThread(name, workExecutor, Selector.open(), threadBufferPool, threadResponsePool); this.threads[i] = new NioThread(name, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool, unsafeResponsePool, safeResponsePool);
} }
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);