This commit is contained in:
Redkale
2021-01-19 15:39:53 +08:00
parent 7fc01d38e8
commit 59a4c85aeb
3 changed files with 71 additions and 78 deletions

View File

@@ -35,7 +35,7 @@ public class TcpNioAsyncConnection extends AsyncConnection {
private int writeTimeoutSeconds; private int writeTimeoutSeconds;
private final SocketAddress remoteAddress; private SocketAddress remoteAddress;
final SocketChannel channel; final SocketChannel channel;
@@ -203,6 +203,20 @@ public class TcpNioAsyncConnection extends AsyncConnection {
return this.channel; return this.channel;
} }
public <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void, ? super A> handler) {
if (channel.isConnected()) {
throw new AlreadyConnectedException();
}
if (connectPending) {
throw new ConnectionPendingException();
}
connectPending = true;
this.connectAttachment = attachment;
this.connectCompletionHandler = (CompletionHandler<Void, Object>) handler;
this.remoteAddress = remote;
doConnect();
}
@Override @Override
public void read(CompletionHandler<Integer, ByteBuffer> handler) { public void read(CompletionHandler<Integer, ByteBuffer> handler) {
Objects.requireNonNull(handler); Objects.requireNonNull(handler);
@@ -313,56 +327,40 @@ public class TcpNioAsyncConnection extends AsyncConnection {
connected = channel.finishConnect(); connected = channel.finishConnect();
} }
if (connected) { if (connected) {
CompletionHandler handler = this.connectCompletionHandler; handleConnect(null);
Object attach = this.connectAttachment;
clearConnect();
if (handler != null) {
if (this.workExecutor == null) {
handler.completed(null, attach);
} else {
this.workExecutor.execute(() -> handler.completed(null, attach));
}
}
} else if (connectKey == null) { } else if (connectKey == null) {
ioThread.register(selector -> { ioThread.register(selector -> {
try { try {
connectKey = channel.register(selector, SelectionKey.OP_CONNECT); connectKey = channel.register(selector, SelectionKey.OP_CONNECT);
connectKey.attach(this); connectKey.attach(this);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
CompletionHandler handler = this.connectCompletionHandler; handleConnect(e);
Object attach = this.connectAttachment;
clearConnect();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
} }
}); });
} else { } else {
CompletionHandler handler = this.connectCompletionHandler; handleConnect(new IOException());
Object attach = this.connectAttachment;
clearConnect();
if (handler != null) {
IOException e = new IOException();
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
} }
} catch (IOException e) { } catch (IOException e) {
handleConnect(e);
}
}
private void handleConnect(Throwable t) {
CompletionHandler handler = this.connectCompletionHandler; CompletionHandler handler = this.connectCompletionHandler;
Object attach = this.connectAttachment; Object attach = this.connectAttachment;
clearConnect(); clearConnect();
if (handler != null) { if (handler != null) {
if (t == null) {
if (this.workExecutor == null) { if (this.workExecutor == null) {
handler.failed(e, attach); handler.completed(null, attach);
} else { } else {
this.workExecutor.execute(() -> handler.failed(e, attach)); this.workExecutor.execute(() -> handler.completed(null, attach));
}
} else {
if (this.workExecutor == null) {
handler.failed(t, attach);
} else {
this.workExecutor.execute(() -> handler.failed(t, attach));
} }
} }
} }
@@ -395,48 +393,41 @@ public class TcpNioAsyncConnection extends AsyncConnection {
totalCount += readCount; totalCount += readCount;
} }
if (totalCount != 0 || !hasRemain) { if (totalCount != 0 || !hasRemain) {
if (readKey != null) readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ); if (!readPending && readKey != null) readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ);
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler; handleRead(totalCount, null);
ByteBuffer attach = this.readByteBuffer;
clearRead();
if (handler != null) {
if (this.workExecutor == null) {
handler.completed(totalCount, attach);
} else {
final int totalCount0 = totalCount;
this.workExecutor.execute(() -> handler.completed(totalCount0, attach));
}
}
} else if (readKey == null) { } else if (readKey == null) {
ioThread.register(selector -> { ioThread.register(selector -> {
try { try {
readKey = channel.register(selector, SelectionKey.OP_READ); readKey = channel.register(selector, SelectionKey.OP_READ);
readKey.attach(this); readKey.attach(this);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler; handleRead(0, e);
ByteBuffer attach = this.readByteBuffer;
clearRead();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
} }
}); });
} else { } else {
ioGroup.interestOpsOr(ioThread, readKey, SelectionKey.OP_READ); ioGroup.interestOpsOr(ioThread, readKey, SelectionKey.OP_READ);
} }
} catch (Exception e) { } catch (Exception e) {
handleRead(0, e);
}
}
private void handleRead(final int totalCount, Throwable t) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler; CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler;
ByteBuffer attach = this.readByteBuffer; ByteBuffer attach = this.readByteBuffer;
clearRead(); clearRead();
if (handler != null) { if (handler != null) {
if (t == null) {
if (this.workExecutor == null) { if (this.workExecutor == null) {
handler.failed(e, attach); handler.completed(totalCount, attach);
} else { } else {
this.workExecutor.execute(() -> handler.failed(e, attach)); this.workExecutor.execute(() -> handler.completed(totalCount, attach));
}
} else {
if (this.workExecutor == null) {
handler.failed(t, attach);
} else {
this.workExecutor.execute(() -> handler.failed(t, attach));
} }
} }
} }

View File

@@ -66,6 +66,7 @@ public class NioThread extends Thread {
while (it.hasNext()) { while (it.hasNext()) {
SelectionKey key = it.next(); SelectionKey key = it.next();
it.remove(); it.remove();
if (!key.isValid()) continue;
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (key.isWritable()) { if (key.isWritable()) {
//key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); //key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);

View File

@@ -24,16 +24,16 @@ import org.redkale.util.ObjectPool;
*/ */
public class NioThreadGroup { public class NioThreadGroup {
private NioThread[] ioThreads; private NioThread[] threads;
private final AtomicInteger index = new AtomicInteger(); private final AtomicInteger index = new AtomicInteger();
private ScheduledThreadPoolExecutor timeoutExecutor; private ScheduledThreadPoolExecutor timeoutExecutor;
public NioThreadGroup(int threads, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) throws IOException { public NioThreadGroup(int threads, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) throws IOException {
this.ioThreads = new NioThread[Math.max(threads, 1)]; this.threads = new NioThread[Math.max(threads, 1)];
for (int i = 0; i < ioThreads.length; i++) { for (int i = 0; i < this.threads.length; i++) {
this.ioThreads[i] = new NioThread(Selector.open(), executor, bufferPool); this.threads[i] = new NioThread(Selector.open(), executor, bufferPool);
} }
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r);
@@ -44,31 +44,32 @@ public class NioThreadGroup {
} }
public void start() { public void start() {
for (int i = 0; i < ioThreads.length; i++) { for (NioThread thread : threads) {
this.ioThreads[i].start(); thread.start();
} }
} }
public void close() { public void close() {
for (int i = 0; i < ioThreads.length; i++) { for (NioThread thread : threads) {
this.ioThreads[i].close(); thread.close();
} }
this.timeoutExecutor.shutdownNow(); this.timeoutExecutor.shutdownNow();
} }
public NioThread nextThread() { public NioThread nextThread() {
return ioThreads[Math.abs(index.getAndIncrement()) % ioThreads.length]; return threads[Math.abs(index.getAndIncrement()) % threads.length];
} }
public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) { public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) {
return timeoutExecutor.schedule(callable, delay, unit); return timeoutExecutor.schedule(callable, delay, unit);
} }
public void interestOpsOr(NioThread ioThread, SelectionKey key, int opt) { public void interestOpsOr(NioThread thread, SelectionKey key, int opt) {
if (key == null) return; if (key == null) return;
if (key.selector() != thread.selector) throw new RuntimeException("NioThread.selector not the same to SelectionKey.selector");
if ((key.interestOps() & opt) != 0) return; if ((key.interestOps() & opt) != 0) return;
key.interestOps(key.interestOps() | opt); key.interestOps(key.interestOps() | opt);
if (ioThread.inSameThread()) return; if (thread.inSameThread()) return;
//非IO线程中 //非IO线程中
key.selector().wakeup(); key.selector().wakeup();
} }