diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index c174b1f37..ab89fc9b8 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -39,7 +39,7 @@ public abstract class AsyncConnection implements AutoCloseable { protected final Consumer bufferConsumer; - protected ByteBuffer readBuffer; + private ByteBuffer readBuffer; //在线数 protected AtomicLong livingCounter; @@ -116,7 +116,7 @@ public abstract class AsyncConnection implements AutoCloseable { public abstract void read(CompletionHandler handler); - public abstract WritableByteChannel rritableByteChannel(); + public abstract WritableByteChannel writableByteChannel(); public abstract void write(ByteBuffer src, A attachment, CompletionHandler handler); diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index d39597423..d2399b20c 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -256,7 +256,7 @@ class TcpAioAsyncConnection extends AsyncConnection { } @Override - public final WritableByteChannel rritableByteChannel() { + public final WritableByteChannel writableByteChannel() { return new WritableByteChannel() { @Override public int write(ByteBuffer src) throws IOException { diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java index b8b7736bd..f3f18591f 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -147,7 +147,7 @@ class UdpBioAsyncConnection extends AsyncConnection { } @Override - public final WritableByteChannel rritableByteChannel() { + public final WritableByteChannel writableByteChannel() { return this.channel; } diff --git a/src/org/redkale/net/nio/AbstractLoop.java b/src/org/redkale/net/nio/AbstractLoop.java deleted file mode 100644 index 849930e40..000000000 --- a/src/org/redkale/net/nio/AbstractLoop.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.nio; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public abstract class AbstractLoop extends Thread { - - private volatile Thread localThread; - - protected volatile boolean closed; - - protected String name; - - protected AbstractLoop(String name) { - this.name = name; - } - - @Override - public final void run() { - this.localThread = Thread.currentThread(); - beforeLoop(); - while (!closed) { - if (Thread.currentThread().isInterrupted()) break; - try { - doLoop(); - } catch (Throwable e) { - e.printStackTrace(); - } - } - afterLoop(); - } - - protected void beforeLoop() { - } - - protected abstract void doLoop(); - - protected void afterLoop() { - } - - public boolean isSameThread() { - return localThread == Thread.currentThread(); - } - - public void shutdown() { - this.closed = true; - } -} diff --git a/src/org/redkale/net/nio/NioCompletionHandler.java b/src/org/redkale/net/nio/NioCompletionHandler.java index cb3808d92..b51859aa1 100644 --- a/src/org/redkale/net/nio/NioCompletionHandler.java +++ b/src/org/redkale/net/nio/NioCompletionHandler.java @@ -32,11 +32,21 @@ class NioCompletionHandler implements CompletionHandler, Runnable @Override public void completed(Integer result, A attach) { + ScheduledFuture future = this.timeoutFuture; + if (future != null) { + this.timeoutFuture = null; + future.cancel(true); + } handler.completed(result, attachment); } @Override public void failed(Throwable exc, A attach) { + ScheduledFuture future = this.timeoutFuture; + if (future != null) { + this.timeoutFuture = null; + future.cancel(true); + } handler.failed(exc, attachment); } diff --git a/src/org/redkale/net/nio/NioEventLoop.java b/src/org/redkale/net/nio/NioEventLoop.java deleted file mode 100644 index c283b0e69..000000000 --- a/src/org/redkale/net/nio/NioEventLoop.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.nio; - -import java.io.IOException; -import java.nio.channels.*; -import java.util.*; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public abstract class NioEventLoop extends AbstractLoop { - - protected final Selector selector; - - public NioEventLoop(String name) { - super(name); - try { - this.selector = Selector.open(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void processKey(SelectionKey key) { - if (key == null || !key.isValid()) return; - if (key.isAcceptable()) { - try { - acceptOP(key); - } catch (Throwable e) { - failedOP(key, e); - } - } else if (key.isConnectable()) { - try { - connectOP(key); - } catch (Throwable e) { - failedOP(key, e); - } - } else if (key.isReadable()) { - try { - readOP(key); - } catch (Throwable e) { - failedOP(key, e); - } - - } else if (key.isWritable()) { - try { - writeOP(key); - } catch (Throwable e) { - failedOP(key, e); - } - } - } - - @Override - protected final void doLoop() { - try { - doLoopProcessing(); - } catch (Throwable e) { - e.printStackTrace(); - } - - try { - selector.select(getSelectorTimeout()); - } catch (IOException e) { - e.printStackTrace(); - } - - try { - Set selectedKeys = selector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - processKey(key); - } - } - } catch (ClosedSelectorException e) { - // do nothing - } - } - - protected long getSelectorTimeout() { - return 10; - } - - protected abstract void doLoopProcessing(); - - protected void acceptOP(SelectionKey key) { - throw new RuntimeException("Accept operation is not implemented!"); - } - - protected void connectOP(SelectionKey key) throws IOException { - throw new RuntimeException("Connect operation is not implemented!"); - } - - protected void readOP(SelectionKey key) throws IOException { - throw new RuntimeException("Accept operation is not implemented!"); - } - - protected void writeOP(SelectionKey key) throws IOException { - throw new RuntimeException("Accept operation is not implemented!"); - } - - protected void failedOP(SelectionKey key, Throwable e) { - // ignore the errors by default - } - -} diff --git a/src/org/redkale/net/nio/NioThread.java b/src/org/redkale/net/nio/NioThread.java index 0754758fb..8ec770cbe 100644 --- a/src/org/redkale/net/nio/NioThread.java +++ b/src/org/redkale/net/nio/NioThread.java @@ -6,7 +6,10 @@ package org.redkale.net.nio; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Consumer; import org.redkale.util.*; /** @@ -19,37 +22,71 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public class NioThread extends Thread { +class NioThread extends Thread { - protected Thread localThread; + final Selector selector; - protected final ExecutorService executor; + private final ExecutorService executor; - protected ObjectPool bufferPool; + private final ObjectPool bufferPool; - public NioThread(ExecutorService executor, ObjectPool bufferPool, Runnable runner) { - super(runner); + private final ConcurrentLinkedQueue> registers = new ConcurrentLinkedQueue<>(); + + private Thread localThread; + + private boolean closed; + + public NioThread(Selector selector, ExecutorService executor, ObjectPool bufferPool) { + super(); + this.selector = selector; this.executor = executor; this.bufferPool = bufferPool; this.setDaemon(true); } - public void runAsync(Runnable runner) { - executor.execute(runner); - } - - public ExecutorService getExecutor() { - return executor; - } - - public ObjectPool getBufferPool() { - return bufferPool; + void register(Consumer consumer) { + registers.offer(consumer); + selector.wakeup(); } @Override public void run() { this.localThread = Thread.currentThread(); - super.run(); + while (!this.closed) { + try { + Consumer register; + while ((register = registers.poll()) != null) { + register.accept(selector); + } + int count = selector.select(); + if (count == 0) continue; + Set keys = selector.selectedKeys(); + Iterator it = keys.iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + try { + if (key.isAcceptable()) { + TcpNioProtocolServer sc = (TcpNioProtocolServer) key.attachment(); + sc.doAccept(); + continue; + } + TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); + if (key.isWritable()) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + conn.doWrite(); + } else if (key.isReadable()) { + conn.doRead(); + } else if (key.isConnectable()) { + conn.doConnect(); + } + } finally { + it.remove(); + } + } + } catch (Exception ex) { + ex.printStackTrace(); + } + } } public boolean inSameThread() { diff --git a/src/org/redkale/net/nio/NioThreadGroup.java b/src/org/redkale/net/nio/NioThreadGroup.java new file mode 100644 index 000000000..22859ebe0 --- /dev/null +++ b/src/org/redkale/net/nio/NioThreadGroup.java @@ -0,0 +1,38 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.nio; + +import java.nio.channels.SelectionKey; +import java.util.concurrent.*; + +/** + * 协议处理的IO线程组 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +class NioThreadGroup { + + private NioThread[] ioThreads; + + private ScheduledThreadPoolExecutor timeoutExecutor; + + public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) { + return timeoutExecutor.schedule(callable, delay, unit); + } + + public void interestOps(NioThread ioThread, SelectionKey key, int opt) { + if ((key.interestOps() & opt) != 0) return; + key.interestOps(key.interestOps() | opt); + if (ioThread.inSameThread()) return; + //非IO线程中 + key.selector().wakeup(); + } +} diff --git a/src/org/redkale/net/nio/NioWorkerThread.java b/src/org/redkale/net/nio/NioWorkerThread.java deleted file mode 100644 index a4b84aca4..000000000 --- a/src/org/redkale/net/nio/NioWorkerThread.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.nio; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class NioWorkerThread extends NioEventLoop { - - public NioWorkerThread(String name) { - super(name); - } - - @Override - protected void doLoopProcessing() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - -} diff --git a/src/org/redkale/net/nio/TcpNioAsyncConnection.java b/src/org/redkale/net/nio/TcpNioAsyncConnection.java index 7a1fe331d..bf99cd098 100644 --- a/src/org/redkale/net/nio/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/nio/TcpNioAsyncConnection.java @@ -9,7 +9,8 @@ import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; -import java.util.Set; +import java.util.*; +import java.util.concurrent.*; import java.util.function.*; import javax.net.ssl.SSLContext; import org.redkale.net.AsyncConnection; @@ -30,13 +31,48 @@ class TcpNioAsyncConnection extends AsyncConnection { private int writeTimeoutSeconds; - private final SocketChannel channel; - private final SocketAddress remoteAddress; - public TcpNioAsyncConnection(ObjectPool bufferPool, SocketChannel ch, + final SocketChannel channel; + + final NioThread ioThread; + + final NioThreadGroup ioGroup; + + final ExecutorService workExecutor; + + //读操作 + private ByteBuffer readByteBuffer; + + private CompletionHandler readCompletionHandler; + + private boolean readPending; + + private SelectionKey readKey; + + //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值 + private ByteBuffer writeByteBuffer; + + private ByteBuffer[] writeByteBuffers; + + private int writeOffset; + + private int writeLength; + + private Object writeAttachment; + + private CompletionHandler writeCompletionHandler; + + private boolean writePending; + + private SelectionKey writeKey; + + public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, ObjectPool bufferPool, SocketChannel ch, SSLContext sslContext, final SocketAddress addr0) { super(bufferPool, sslContext); + this.ioGroup = ioGroup; + this.ioThread = ioThread; + this.workExecutor = workExecutor; this.channel = ch; SocketAddress addr = addr0; if (addr == null) { @@ -49,9 +85,12 @@ class TcpNioAsyncConnection extends AsyncConnection { this.remoteAddress = addr; } - public TcpNioAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, + public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, Supplier bufferSupplier, Consumer bufferConsumer, SocketChannel ch, SSLContext sslContext, final SocketAddress addr0) { super(bufferSupplier, bufferConsumer, sslContext); + this.ioGroup = ioGroup; + this.ioThread = ioThread; + this.workExecutor = workExecutor; this.channel = ch; SocketAddress addr = addr0; if (addr == null) { @@ -150,22 +189,264 @@ class TcpNioAsyncConnection extends AsyncConnection { @Override public void read(CompletionHandler handler) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + Objects.requireNonNull(handler); + if (!this.channel.isConnected()) { + if (this.workExecutor == null) { + handler.failed(new NotYetConnectedException(), pollReadBuffer()); + } else { + this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), pollReadBuffer())); + } + return; + } + if (this.readPending) { + if (this.workExecutor == null) { + handler.failed(new ReadPendingException(), pollReadBuffer()); + } else { + this.workExecutor.execute(() -> handler.failed(new ReadPendingException(), pollReadBuffer())); + } + return; + } + this.readPending = true; + this.readByteBuffer = pollReadBuffer(); + if (this.readTimeoutSeconds > 0) { + NioCompletionHandler newhandler = new NioCompletionHandler(handler, this.readByteBuffer); + this.readCompletionHandler = newhandler; + newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.readTimeoutSeconds, TimeUnit.SECONDS); + } else { + this.readCompletionHandler = handler; + } + doRead(); } @Override - public WritableByteChannel rritableByteChannel() { + public WritableByteChannel writableByteChannel() { return this.channel; } @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + Objects.requireNonNull(src); + Objects.requireNonNull(handler); + if (!this.channel.isConnected()) { + if (this.workExecutor == null) { + handler.failed(new NotYetConnectedException(), attachment); + } else { + this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment)); + } + return; + } + if (this.writePending) { + if (this.workExecutor == null) { + handler.failed(new WritePendingException(), attachment); + } else { + this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment)); + } + return; + } + this.writePending = true; + this.writeByteBuffer = src; + this.writeAttachment = attachment; + if (this.writeTimeoutSeconds > 0) { + NioCompletionHandler newhandler = new NioCompletionHandler(handler, attachment); + this.writeCompletionHandler = newhandler; + newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); + } else { + this.writeCompletionHandler = (CompletionHandler) handler; + } + doWrite(); } @Override public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + Objects.requireNonNull(srcs); + Objects.requireNonNull(handler); + if (!this.channel.isConnected()) { + if (this.workExecutor == null) { + handler.failed(new NotYetConnectedException(), attachment); + } else { + this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment)); + } + return; + } + if (this.writePending) { + if (this.workExecutor == null) { + handler.failed(new WritePendingException(), attachment); + } else { + this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment)); + } + return; + } + this.writePending = true; + this.writeByteBuffers = srcs; + this.writeOffset = offset; + this.writeLength = length; + this.writeAttachment = attachment; + if (this.writeTimeoutSeconds > 0) { + NioCompletionHandler newhandler = new NioCompletionHandler(handler, attachment); + this.writeCompletionHandler = newhandler; + newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); + } else { + this.writeCompletionHandler = (CompletionHandler) handler; + } + doWrite(); } + void doConnect() { + + } + + void doRead() { + try { + final boolean invokeDirect = this.ioThread.inSameThread(); + int totalCount = 0; + boolean hasRemain = true; + while (invokeDirect && hasRemain) { + int readCount = this.channel.read(readByteBuffer); + hasRemain = readByteBuffer.hasRemaining(); + if (readCount <= 0) { + if (totalCount == 0) totalCount = readCount; + break; + } + totalCount += readCount; + } + if (totalCount != 0 || !hasRemain) { + CompletionHandler handler = this.readCompletionHandler; + ByteBuffer attach = this.readByteBuffer; + clearRead(); + if (handler != null) { + if (this.workExecutor == null) { + handler.completed(totalCount, attach); + } else { + final int totalCount0 = totalCount; + this.workExecutor.execute(() -> handler.completed(totalCount0, attach)); + } + } + if (readKey != null) { + readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ); + } + } else if (readKey == null) { + ioThread.register(selector -> { + try { + readKey = channel.register(selector, SelectionKey.OP_READ); + readKey.attach(this); + } catch (ClosedChannelException e) { + CompletionHandler handler = this.readCompletionHandler; + ByteBuffer attach = this.readByteBuffer; + clearRead(); + if (handler != null) { + if (this.workExecutor == null) { + handler.failed(e, attach); + } else { + this.workExecutor.execute(() -> handler.failed(e, attach)); + } + } + } + }); + } else { + ioGroup.interestOps(ioThread, readKey, SelectionKey.OP_READ); + } + } catch (Exception e) { + CompletionHandler handler = this.readCompletionHandler; + ByteBuffer attach = this.readByteBuffer; + clearRead(); + if (handler != null) { + if (this.workExecutor == null) { + handler.failed(e, attach); + } else { + this.workExecutor.execute(() -> handler.failed(e, attach)); + } + } + } + } + + private void clearRead() { + this.readCompletionHandler = null; + this.readByteBuffer = null; + this.readPending = false; //必须放最后 + } + + void doWrite() { + try { + final boolean invokeDirect = this.ioThread.inSameThread(); + int totalCount = 0; + boolean hasRemain = true; + while (invokeDirect && hasRemain) { + int writeCount; + if (writeByteBuffer != null) { + writeCount = channel.write(writeByteBuffer); + hasRemain = writeByteBuffer.hasRemaining(); + } else { + writeCount = (int) channel.write(writeByteBuffers, writeOffset, writeLength); + boolean remain = false; + for (int i = writeByteBuffers.length - 1; i >= writeOffset; i--) { + if (writeByteBuffers[i].hasRemaining()) { + remain = true; + break; + } + } + hasRemain = remain; + } + if (writeCount <= 0) { + if (totalCount == 0) totalCount = writeCount; + break; + } + totalCount += writeCount; + } + + if (totalCount > 0 || !hasRemain) { + CompletionHandler handler = this.writeCompletionHandler; + Object attach = this.writeAttachment; + clearWrite(); + if (handler != null) { + if (this.workExecutor == null) { + handler.completed(totalCount, attach); + } else { + final int totalCount0 = totalCount; + this.workExecutor.execute(() -> handler.completed(totalCount0, attach)); + } + } + } else if (writeKey == null) { + ioThread.register(selector -> { + try { + writeKey = channel.register(selector, SelectionKey.OP_WRITE); + writeKey.attach(this); + } catch (ClosedChannelException e) { + CompletionHandler handler = this.writeCompletionHandler; + Object attach = this.writeAttachment; + clearWrite(); + if (handler != null) { + if (this.workExecutor == null) { + handler.failed(e, attach); + } else { + this.workExecutor.execute(() -> handler.failed(e, attach)); + } + } + } + }); + } else { + ioGroup.interestOps(ioThread, writeKey, SelectionKey.OP_WRITE); + } + } catch (IOException e) { + CompletionHandler handler = this.writeCompletionHandler; + Object attach = this.writeAttachment; + clearWrite(); + if (handler != null) { + if (this.workExecutor == null) { + handler.failed(e, attach); + } else { + this.workExecutor.execute(() -> handler.failed(e, attach)); + } + } + } + } + + private void clearWrite() { + this.writeCompletionHandler = null; + this.writeAttachment = null; + this.writeByteBuffer = null; + this.writeByteBuffers = null; + this.writeOffset = 0; + this.writeLength = 0; + this.writePending = false; //必须放最后 + } } diff --git a/src/org/redkale/net/nio/TcpNioProtocolServer.java b/src/org/redkale/net/nio/TcpNioProtocolServer.java index 5daf3a68c..3cf9368c0 100644 --- a/src/org/redkale/net/nio/TcpNioProtocolServer.java +++ b/src/org/redkale/net/nio/TcpNioProtocolServer.java @@ -17,7 +17,7 @@ import org.redkale.util.AnyValue; * 详情见: https://redkale.org * * @author zhangjx - * + * * @since 2.1.0 */ public class TcpNioProtocolServer extends ProtocolServer { @@ -56,4 +56,6 @@ public class TcpNioProtocolServer extends ProtocolServer { throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. } + void doAccept() { + } }