This commit is contained in:
Redkale
2020-06-25 20:55:17 +08:00
parent b794872147
commit 0c8b0f5e19
2 changed files with 66 additions and 24 deletions

View File

@@ -64,23 +64,15 @@ class NioThread extends Thread {
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
try {
if (key.isAcceptable()) {
TcpNioProtocolServer sc = (TcpNioProtocolServer) key.attachment();
sc.doAccept();
continue;
}
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (key.isWritable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite();
} else if (key.isReadable()) {
conn.doRead();
} else if (key.isConnectable()) {
conn.doConnect();
}
} finally {
it.remove();
it.remove();
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (key.isWritable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite();
} else if (key.isReadable()) {
conn.doRead();
} else if (key.isConnectable()) {
conn.doConnect();
}
}
} catch (Exception ex) {

View File

@@ -7,8 +7,8 @@ package org.redkale.net.nio;
import java.io.IOException;
import java.net.*;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
import java.nio.channels.*;
import java.util.*;
import org.redkale.net.*;
import org.redkale.util.AnyValue;
@@ -25,15 +25,39 @@ public class TcpNioProtocolServer extends ProtocolServer {
private ServerSocketChannel serverChannel;
private Selector selector;
private NioThreadGroup ioGroup;
private Thread acceptThread;
private boolean closed;
public TcpNioProtocolServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
this.selector = Selector.open();
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
@@ -53,15 +77,41 @@ public class TcpNioProtocolServer extends ProtocolServer {
@Override
public void accept(Server server) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
this.acceptThread = new Thread() {
@Override
public void run() {
while (!closed) {
try {
int count = selector.select();
if (count == 0) continue;
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
// 获取事件
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) accept(key);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
};
this.acceptThread.start();
}
private void accept(SelectionKey key) throws IOException {
SocketChannel channel = this.serverChannel.accept();
channel.configureBlocking(false);
}
@Override
public void close() throws IOException {
if (this.closed) return;
this.closed = true;
this.serverChannel.close();
}
void doAccept() {
}
}