This commit is contained in:
Redkale
2020-06-25 23:33:57 +08:00
parent 0c8b0f5e19
commit f88db8abf9
6 changed files with 81 additions and 15 deletions

View File

@@ -73,7 +73,7 @@ public abstract class ProtocolServer {
} else if ("aio".equalsIgnoreCase(netimpl)) {
return new TcpAioProtocolServer(context);
} else if ("nio".equalsIgnoreCase(netimpl)) {
return null;// return new TcpNioProtocolServer(context);
return new TcpNioProtocolServer(context);
}
} else if ("UDP".equalsIgnoreCase(protocol)) {
if (netimpl == null || netimpl.isEmpty()) {

View File

@@ -3,7 +3,7 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
package org.redkale.net;
import java.io.IOException;
import java.net.*;
@@ -15,6 +15,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncConnection;
import org.redkale.net.nio.NioCompletionHandler;
import org.redkale.net.nio.NioThread;
import org.redkale.net.nio.NioThreadGroup;
import org.redkale.util.ObjectPool;
/**
@@ -26,7 +29,7 @@ import org.redkale.util.ObjectPool;
*
* @since 2.1.0
*/
class TcpNioAsyncConnection extends AsyncConnection {
public class TcpNioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -304,7 +307,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
doWrite();
}
void doConnect() {
public void doConnect() {
try {
boolean connected = channel.isConnectionPending();
if (connected || channel.connect(remoteAddress)) {
@@ -372,7 +375,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
this.connectPending = false;//必须放最后
}
void doRead() {
public void doRead() {
try {
final boolean invokeDirect = this.ioThread.inSameThread();
int totalCount = 0;
@@ -440,7 +443,7 @@ class TcpNioAsyncConnection extends AsyncConnection {
this.readPending = false; //必须放最后
}
void doWrite() {
public void doWrite() {
try {
final boolean invokeDirect = this.ioThread.inSameThread();
int totalCount = 0;

View File

@@ -3,14 +3,17 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.*;
import org.redkale.util.AnyValue;
import org.redkale.net.nio.*;
import org.redkale.util.*;
/**
*
@@ -23,6 +26,10 @@ import org.redkale.util.AnyValue;
*/
public class TcpNioProtocolServer extends ProtocolServer {
private ObjectPool<ByteBuffer> bufferPool;
private ObjectPool<Response> responsePool;
private ServerSocketChannel serverChannel;
private Selector selector;
@@ -78,6 +85,18 @@ public class TcpNioProtocolServer extends ProtocolServer {
@Override
public void accept(Server server) throws IOException {
this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), context.executor, bufferPool);
this.ioGroup.start();
this.acceptThread = new Thread() {
@Override
public void run() {
@@ -88,7 +107,6 @@ public class TcpNioProtocolServer extends ProtocolServer {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
// 获取事件
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) accept(key);
@@ -105,13 +123,24 @@ public class TcpNioProtocolServer extends ProtocolServer {
private void accept(SelectionKey key) throws IOException {
SocketChannel channel = this.serverChannel.accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
NioThread ioThread = ioGroup.nextThread();
AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, bufferPool, channel, context.getSSLContext(), null, livingCounter, closedCounter);
new PrepareRunner(context, responsePool, conn, null, null).run();
}
@Override
public void close() throws IOException {
if (this.closed) return;
this.closed = true;
this.selector.wakeup();
this.ioGroup.close();
this.serverChannel.close();
this.selector.close();
}
}

View File

@@ -17,13 +17,13 @@ import java.util.concurrent.*;
*
* @since 2.1.0
*/
class NioCompletionHandler<A> implements CompletionHandler<Integer, A>, Runnable {
public class NioCompletionHandler<A> implements CompletionHandler<Integer, A>, Runnable {
private final CompletionHandler<Integer, A> handler;
private final A attachment;
ScheduledFuture timeoutFuture;
public ScheduledFuture timeoutFuture;
public NioCompletionHandler(CompletionHandler<Integer, A> handler, A attachment) {
this.handler = handler;

View File

@@ -10,6 +10,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.redkale.net.TcpNioAsyncConnection;
import org.redkale.util.*;
/**
@@ -22,7 +23,7 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
class NioThread extends Thread {
public class NioThread extends Thread {
final Selector selector;
@@ -44,7 +45,7 @@ class NioThread extends Thread {
this.setDaemon(true);
}
void register(Consumer<Selector> consumer) {
public void register(Consumer<Selector> consumer) {
registers.offer(consumer);
selector.wakeup();
}
@@ -89,4 +90,8 @@ class NioThread extends Thread {
return this.localThread == thread;
}
public void close() {
this.closed = true;
this.interrupt();
}
}

View File

@@ -5,8 +5,12 @@
*/
package org.redkale.net.nio;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.util.ObjectPool;
/**
* 协议处理的IO线程组
@@ -18,12 +22,37 @@ import java.util.concurrent.*;
*
* @since 2.1.0
*/
class NioThreadGroup {
public class NioThreadGroup {
private NioThread[] ioThreads;
private final AtomicInteger index = new AtomicInteger();
private ScheduledThreadPoolExecutor timeoutExecutor;
public NioThreadGroup(int threads, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) throws IOException {
this.ioThreads = new NioThread[Math.max(threads, 1)];
for (int i = 0; i < ioThreads.length; i++) {
this.ioThreads[i] = new NioThread(Selector.open(), executor, bufferPool);
}
}
public void start() {
for (int i = 0; i < ioThreads.length; i++) {
this.ioThreads[i].start();
}
}
public void close() {
for (int i = 0; i < ioThreads.length; i++) {
this.ioThreads[i].close();
}
}
public NioThread nextThread() {
return ioThreads[Math.abs(index.getAndIncrement()) % ioThreads.length];
}
public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) {
return timeoutExecutor.schedule(callable, delay, unit);
}