This commit is contained in:
Redkale
2021-01-19 15:44:31 +08:00
parent 59a4c85aeb
commit 80c07a5c5b
3 changed files with 10 additions and 7 deletions

View File

@@ -18,7 +18,6 @@ import org.redkale.net.AsyncConnection;
import org.redkale.net.nio.NioCompletionHandler; import org.redkale.net.nio.NioCompletionHandler;
import org.redkale.net.nio.NioThread; import org.redkale.net.nio.NioThread;
import org.redkale.net.nio.NioThreadGroup; import org.redkale.net.nio.NioThreadGroup;
import org.redkale.util.ObjectPool;
/** /**
* *
@@ -81,9 +80,9 @@ public class TcpNioAsyncConnection extends AsyncConnection {
private SelectionKey writeKey; private SelectionKey writeKey;
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor,
ObjectPool<ByteBuffer> bufferPool, SocketChannel ch, SocketChannel ch,
SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) {
super(bufferPool, sslContext, livingCounter, closedCounter); super(ioThread.getBufferPool(), sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup; this.ioGroup = ioGroup;
this.ioThread = ioThread; this.ioThread = ioThread;
this.workExecutor = workExecutor; this.workExecutor = workExecutor;

View File

@@ -85,7 +85,7 @@ public class TcpNioProtocolServer extends ProtocolServer {
@Override @Override
public void accept(Server server) throws IOException { public void accept(Server server) throws IOException {
this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
AtomicLong createBufferCounter = new AtomicLong(); AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong(); AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
@@ -93,10 +93,10 @@ public class TcpNioProtocolServer extends ProtocolServer {
AtomicLong cycleResponseCounter = new AtomicLong(); AtomicLong cycleResponseCounter = new AtomicLong();
this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), context.executor, bufferPool); this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), context.executor, bufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.acceptThread = new Thread() { this.acceptThread = new Thread() {
@Override @Override
public void run() { public void run() {
@@ -129,7 +129,7 @@ public class TcpNioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
NioThread ioThread = ioGroup.nextThread(); NioThread ioThread = ioGroup.nextThread();
AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, bufferPool, channel, context.getSSLContext(), null, livingCounter, closedCounter); AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, channel, context.getSSLContext(), null, livingCounter, closedCounter);
new PrepareRunner(context, responsePool, conn, null, null).run(); new PrepareRunner(context, responsePool, conn, null, null).run();
} }

View File

@@ -50,6 +50,10 @@ public class NioThread extends Thread {
selector.wakeup(); selector.wakeup();
} }
public ObjectPool<ByteBuffer> getBufferPool() {
return bufferPool;
}
@Override @Override
public void run() { public void run() {
this.localThread = Thread.currentThread(); this.localThread = Thread.currentThread();