This commit is contained in:
Redkale
2020-06-24 16:34:58 +08:00
parent d211692306
commit 1d030183bb
11 changed files with 400 additions and 238 deletions

View File

@@ -39,7 +39,7 @@ public abstract class AsyncConnection implements AutoCloseable {
protected final Consumer<ByteBuffer> bufferConsumer;
protected ByteBuffer readBuffer;
private ByteBuffer readBuffer;
//在线数
protected AtomicLong livingCounter;
@@ -116,7 +116,7 @@ public abstract class AsyncConnection implements AutoCloseable {
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
public abstract WritableByteChannel rritableByteChannel();
public abstract WritableByteChannel writableByteChannel();
public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);

View File

@@ -256,7 +256,7 @@ class TcpAioAsyncConnection extends AsyncConnection {
}
@Override
public final WritableByteChannel rritableByteChannel() {
public final WritableByteChannel writableByteChannel() {
return new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {

View File

@@ -147,7 +147,7 @@ class UdpBioAsyncConnection extends AsyncConnection {
}
@Override
public final WritableByteChannel rritableByteChannel() {
public final WritableByteChannel writableByteChannel() {
return this.channel;
}

View File

@@ -1,59 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public abstract class AbstractLoop extends Thread {
private volatile Thread localThread;
protected volatile boolean closed;
protected String name;
protected AbstractLoop(String name) {
this.name = name;
}
@Override
public final void run() {
this.localThread = Thread.currentThread();
beforeLoop();
while (!closed) {
if (Thread.currentThread().isInterrupted()) break;
try {
doLoop();
} catch (Throwable e) {
e.printStackTrace();
}
}
afterLoop();
}
protected void beforeLoop() {
}
protected abstract void doLoop();
protected void afterLoop() {
}
public boolean isSameThread() {
return localThread == Thread.currentThread();
}
public void shutdown() {
this.closed = true;
}
}

View File

@@ -32,11 +32,21 @@ class NioCompletionHandler<A> implements CompletionHandler<Integer, A>, Runnable
@Override
public void completed(Integer result, A attach) {
ScheduledFuture future = this.timeoutFuture;
if (future != null) {
this.timeoutFuture = null;
future.cancel(true);
}
handler.completed(result, attachment);
}
@Override
public void failed(Throwable exc, A attach) {
ScheduledFuture future = this.timeoutFuture;
if (future != null) {
this.timeoutFuture = null;
future.cancel(true);
}
handler.failed(exc, attachment);
}

View File

@@ -1,119 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
import java.io.IOException;
import java.nio.channels.*;
import java.util.*;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public abstract class NioEventLoop extends AbstractLoop {
protected final Selector selector;
public NioEventLoop(String name) {
super(name);
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
if (key.isAcceptable()) {
try {
acceptOP(key);
} catch (Throwable e) {
failedOP(key, e);
}
} else if (key.isConnectable()) {
try {
connectOP(key);
} catch (Throwable e) {
failedOP(key, e);
}
} else if (key.isReadable()) {
try {
readOP(key);
} catch (Throwable e) {
failedOP(key, e);
}
} else if (key.isWritable()) {
try {
writeOP(key);
} catch (Throwable e) {
failedOP(key, e);
}
}
}
@Override
protected final void doLoop() {
try {
doLoopProcessing();
} catch (Throwable e) {
e.printStackTrace();
}
try {
selector.select(getSelectorTimeout());
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
processKey(key);
}
}
} catch (ClosedSelectorException e) {
// do nothing
}
}
protected long getSelectorTimeout() {
return 10;
}
protected abstract void doLoopProcessing();
protected void acceptOP(SelectionKey key) {
throw new RuntimeException("Accept operation is not implemented!");
}
protected void connectOP(SelectionKey key) throws IOException {
throw new RuntimeException("Connect operation is not implemented!");
}
protected void readOP(SelectionKey key) throws IOException {
throw new RuntimeException("Accept operation is not implemented!");
}
protected void writeOP(SelectionKey key) throws IOException {
throw new RuntimeException("Accept operation is not implemented!");
}
protected void failedOP(SelectionKey key, Throwable e) {
// ignore the errors by default
}
}

View File

@@ -6,7 +6,10 @@
package org.redkale.net.nio;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.redkale.util.*;
/**
@@ -19,37 +22,71 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
public class NioThread extends Thread {
class NioThread extends Thread {
protected Thread localThread;
final Selector selector;
protected final ExecutorService executor;
private final ExecutorService executor;
protected ObjectPool<ByteBuffer> bufferPool;
private final ObjectPool<ByteBuffer> bufferPool;
public NioThread(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, Runnable runner) {
super(runner);
private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>();
private Thread localThread;
private boolean closed;
public NioThread(Selector selector, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool) {
super();
this.selector = selector;
this.executor = executor;
this.bufferPool = bufferPool;
this.setDaemon(true);
}
public void runAsync(Runnable runner) {
executor.execute(runner);
}
public ExecutorService getExecutor() {
return executor;
}
public ObjectPool<ByteBuffer> getBufferPool() {
return bufferPool;
void register(Consumer<Selector> consumer) {
registers.offer(consumer);
selector.wakeup();
}
@Override
public void run() {
this.localThread = Thread.currentThread();
super.run();
while (!this.closed) {
try {
Consumer<Selector> register;
while ((register = registers.poll()) != null) {
register.accept(selector);
}
int count = selector.select();
if (count == 0) continue;
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
try {
if (key.isAcceptable()) {
TcpNioProtocolServer sc = (TcpNioProtocolServer) key.attachment();
sc.doAccept();
continue;
}
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (key.isWritable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite();
} else if (key.isReadable()) {
conn.doRead();
} else if (key.isConnectable()) {
conn.doConnect();
}
} finally {
it.remove();
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
public boolean inSameThread() {

View File

@@ -0,0 +1,38 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
import java.nio.channels.SelectionKey;
import java.util.concurrent.*;
/**
* 协议处理的IO线程组
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
class NioThreadGroup {
private NioThread[] ioThreads;
private ScheduledThreadPoolExecutor timeoutExecutor;
public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) {
return timeoutExecutor.schedule(callable, delay, unit);
}
public void interestOps(NioThread ioThread, SelectionKey key, int opt) {
if ((key.interestOps() & opt) != 0) return;
key.interestOps(key.interestOps() | opt);
if (ioThread.inSameThread()) return;
//非IO线程中
key.selector().wakeup();
}
}

View File

@@ -1,28 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.nio;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class NioWorkerThread extends NioEventLoop {
public NioWorkerThread(String name) {
super(name);
}
@Override
protected void doLoopProcessing() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}

View File

@@ -9,7 +9,8 @@ import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncConnection;
@@ -30,13 +31,48 @@ class TcpNioAsyncConnection extends AsyncConnection {
private int writeTimeoutSeconds;
private final SocketChannel channel;
private final SocketAddress remoteAddress;
public TcpNioAsyncConnection(ObjectPool<ByteBuffer> bufferPool, SocketChannel ch,
final SocketChannel channel;
final NioThread ioThread;
final NioThreadGroup ioGroup;
final ExecutorService workExecutor;
//读操作
private ByteBuffer readByteBuffer;
private CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
private boolean readPending;
private SelectionKey readKey;
//写操作, 二选一要么writeByteBuffer有值要么writeByteBuffers、writeOffset、writeLength有值
private ByteBuffer writeByteBuffer;
private ByteBuffer[] writeByteBuffers;
private int writeOffset;
private int writeLength;
private Object writeAttachment;
private CompletionHandler<Integer, Object> writeCompletionHandler;
private boolean writePending;
private SelectionKey writeKey;
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, ObjectPool<ByteBuffer> bufferPool, SocketChannel ch,
SSLContext sslContext, final SocketAddress addr0) {
super(bufferPool, sslContext);
this.ioGroup = ioGroup;
this.ioThread = ioThread;
this.workExecutor = workExecutor;
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
@@ -49,9 +85,12 @@ class TcpNioAsyncConnection extends AsyncConnection {
this.remoteAddress = addr;
}
public TcpNioAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
SocketChannel ch, SSLContext sslContext, final SocketAddress addr0) {
super(bufferSupplier, bufferConsumer, sslContext);
this.ioGroup = ioGroup;
this.ioThread = ioThread;
this.workExecutor = workExecutor;
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
@@ -150,22 +189,264 @@ class TcpNioAsyncConnection extends AsyncConnection {
@Override
public void read(CompletionHandler<Integer, ByteBuffer> handler) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), pollReadBuffer());
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), pollReadBuffer()));
}
return;
}
if (this.readPending) {
if (this.workExecutor == null) {
handler.failed(new ReadPendingException(), pollReadBuffer());
} else {
this.workExecutor.execute(() -> handler.failed(new ReadPendingException(), pollReadBuffer()));
}
return;
}
this.readPending = true;
this.readByteBuffer = pollReadBuffer();
if (this.readTimeoutSeconds > 0) {
NioCompletionHandler newhandler = new NioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
doRead();
}
@Override
public WritableByteChannel rritableByteChannel() {
public WritableByteChannel writableByteChannel() {
return this.channel;
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
Objects.requireNonNull(src);
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment));
}
return;
}
if (this.writePending) {
if (this.workExecutor == null) {
handler.failed(new WritePendingException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment));
}
return;
}
this.writePending = true;
this.writeByteBuffer = src;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
NioCompletionHandler newhandler = new NioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite();
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
Objects.requireNonNull(srcs);
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment));
}
return;
}
if (this.writePending) {
if (this.workExecutor == null) {
handler.failed(new WritePendingException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment));
}
return;
}
this.writePending = true;
this.writeByteBuffers = srcs;
this.writeOffset = offset;
this.writeLength = length;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
NioCompletionHandler newhandler = new NioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite();
}
void doConnect() {
}
void doRead() {
try {
final boolean invokeDirect = this.ioThread.inSameThread();
int totalCount = 0;
boolean hasRemain = true;
while (invokeDirect && hasRemain) {
int readCount = this.channel.read(readByteBuffer);
hasRemain = readByteBuffer.hasRemaining();
if (readCount <= 0) {
if (totalCount == 0) totalCount = readCount;
break;
}
totalCount += readCount;
}
if (totalCount != 0 || !hasRemain) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler;
ByteBuffer attach = this.readByteBuffer;
clearRead();
if (handler != null) {
if (this.workExecutor == null) {
handler.completed(totalCount, attach);
} else {
final int totalCount0 = totalCount;
this.workExecutor.execute(() -> handler.completed(totalCount0, attach));
}
}
if (readKey != null) {
readKey.interestOps(readKey.interestOps() & ~SelectionKey.OP_READ);
}
} else if (readKey == null) {
ioThread.register(selector -> {
try {
readKey = channel.register(selector, SelectionKey.OP_READ);
readKey.attach(this);
} catch (ClosedChannelException e) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler;
ByteBuffer attach = this.readByteBuffer;
clearRead();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
}
});
} else {
ioGroup.interestOps(ioThread, readKey, SelectionKey.OP_READ);
}
} catch (Exception e) {
CompletionHandler<Integer, ByteBuffer> handler = this.readCompletionHandler;
ByteBuffer attach = this.readByteBuffer;
clearRead();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
}
}
private void clearRead() {
this.readCompletionHandler = null;
this.readByteBuffer = null;
this.readPending = false; //必须放最后
}
void doWrite() {
try {
final boolean invokeDirect = this.ioThread.inSameThread();
int totalCount = 0;
boolean hasRemain = true;
while (invokeDirect && hasRemain) {
int writeCount;
if (writeByteBuffer != null) {
writeCount = channel.write(writeByteBuffer);
hasRemain = writeByteBuffer.hasRemaining();
} else {
writeCount = (int) channel.write(writeByteBuffers, writeOffset, writeLength);
boolean remain = false;
for (int i = writeByteBuffers.length - 1; i >= writeOffset; i--) {
if (writeByteBuffers[i].hasRemaining()) {
remain = true;
break;
}
}
hasRemain = remain;
}
if (writeCount <= 0) {
if (totalCount == 0) totalCount = writeCount;
break;
}
totalCount += writeCount;
}
if (totalCount > 0 || !hasRemain) {
CompletionHandler<Integer, Object> handler = this.writeCompletionHandler;
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.completed(totalCount, attach);
} else {
final int totalCount0 = totalCount;
this.workExecutor.execute(() -> handler.completed(totalCount0, attach));
}
}
} else if (writeKey == null) {
ioThread.register(selector -> {
try {
writeKey = channel.register(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} catch (ClosedChannelException e) {
CompletionHandler<Integer, Object> handler = this.writeCompletionHandler;
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
}
});
} else {
ioGroup.interestOps(ioThread, writeKey, SelectionKey.OP_WRITE);
}
} catch (IOException e) {
CompletionHandler<Integer, Object> handler = this.writeCompletionHandler;
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
}
}
}
private void clearWrite() {
this.writeCompletionHandler = null;
this.writeAttachment = null;
this.writeByteBuffer = null;
this.writeByteBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writePending = false; //必须放最后
}
}

View File

@@ -17,7 +17,7 @@ import org.redkale.util.AnyValue;
* 详情见: https://redkale.org
*
* @author zhangjx
*
*
* @since 2.1.0
*/
public class TcpNioProtocolServer extends ProtocolServer {
@@ -56,4 +56,6 @@ public class TcpNioProtocolServer extends ProtocolServer {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
void doAccept() {
}
}