This commit is contained in:
Redkale
2021-01-19 17:00:28 +08:00
parent fa7db42f50
commit b83f6867f3
4 changed files with 24 additions and 8 deletions

View File

@@ -94,7 +94,7 @@ public class TcpNioProtocolServer extends ProtocolServer {
this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), context.executor, bufferPool); this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), bufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.acceptThread = new Thread() { this.acceptThread = new Thread() {

View File

@@ -27,8 +27,6 @@ public class NioThread extends Thread {
final Selector selector; final Selector selector;
private final ExecutorService executor;
private final ObjectPool<ByteBuffer> bufferPool; private final ObjectPool<ByteBuffer> bufferPool;
private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>();
@@ -37,10 +35,9 @@ public class NioThread extends Thread {
private boolean closed; private boolean closed;
public NioThread(Selector selector, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) { public NioThread(Selector selector, ObjectPool<ByteBuffer> bufferPool) {
super(); super();
this.selector = selector; this.selector = selector;
this.executor = executor;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.setDaemon(true); this.setDaemon(true);
} }

View File

@@ -30,10 +30,13 @@ public class NioThreadGroup {
private ScheduledThreadPoolExecutor timeoutExecutor; private ScheduledThreadPoolExecutor timeoutExecutor;
public NioThreadGroup(int threads, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) throws IOException { public NioThreadGroup(int threads, ObjectPool<ByteBuffer> bufferPool) throws IOException {
this.threads = new NioThread[Math.max(threads, 1)]; this.threads = new NioThread[Math.max(threads, 1)];
for (int i = 0; i < this.threads.length; i++) { for (int i = 0; i < this.threads.length; i++) {
this.threads[i] = new NioThread(Selector.open(), executor, bufferPool); ObjectPool<ByteBuffer> threadBufferPool = ObjectPool.createUnsafePool(bufferPool.getCreatCounter(),
bufferPool.getCycleCounter(), 8,
bufferPool.getCreator(), bufferPool.getPrepare(), bufferPool.getRecycler());
this.threads[i] = new NioThread(Selector.open(), threadBufferPool);
} }
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);

View File

@@ -130,10 +130,26 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
return this.creator; return this.creator;
} }
public Predicate<T> getRecyclerPredicate() { public int getMax() {
return max;
}
public Consumer<T> getPrepare() {
return prepare;
}
public Predicate<T> getRecycler() {
return recycler; return recycler;
} }
public AtomicLong getCreatCounter() {
return creatCounter;
}
public AtomicLong getCycleCounter() {
return cycleCounter;
}
@Override @Override
public T get() { public T get() {
T result = queue.poll(); T result = queue.poll();