This commit is contained in:
Redkale
2021-01-20 10:14:52 +08:00
parent 7d1770da8a
commit 96fd660d46
16 changed files with 257 additions and 153 deletions

View File

@@ -22,7 +22,7 @@ import javax.net.ssl.SSLContext;
*
* @author zhangjx
*/
class TcpAioAsyncConnection extends AsyncConnection {
class AioTcpAsyncConnection extends AsyncConnection {
//private final Semaphore semaphore = new Semaphore(1);
private int readTimeoutSeconds;
@@ -35,7 +35,7 @@ class TcpAioAsyncConnection extends AsyncConnection {
private BlockingQueue<WriteEntry> writeQueue;
public TcpAioAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
public AioTcpAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0,
final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {

View File

@@ -22,13 +22,13 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
public class TcpAioProtocolServer extends ProtocolServer {
public class AioTcpProtocolServer extends ProtocolServer {
private AsynchronousChannelGroup group;
private AsynchronousServerSocketChannel serverChannel;
public TcpAioProtocolServer(Context context) {
public AioTcpProtocolServer(Context context) {
super(context);
}
@@ -102,7 +102,7 @@ public class TcpAioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel,
AsyncConnection conn = new AioTcpAsyncConnection(bufferPool, bufferPool, channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
//context.runAsync(new PrepareRunner(context, responsePool, conn, null, null));
new PrepareRunner(context, responsePool, conn, null, null).run();

View File

@@ -298,7 +298,7 @@ public abstract class AsyncConnection implements AutoCloseable {
channel.connect(address, null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
future.complete(new TcpAioAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null));
future.complete(new AioTcpAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null));
}
@Override
@@ -341,27 +341,27 @@ public abstract class AsyncConnection implements AutoCloseable {
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch, SSLContext sslContext,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final DatagramChannel ch, SSLContext sslContext,
SocketAddress addr, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) {
@@ -370,22 +370,22 @@ public abstract class AsyncConnection implements AutoCloseable {
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
}

View File

@@ -21,7 +21,7 @@ import javax.net.ssl.SSLContext;
*
* @author zhangjx
*/
class UdpBioAsyncConnection extends AsyncConnection {
class BioUdpAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -33,7 +33,7 @@ class UdpBioAsyncConnection extends AsyncConnection {
private final boolean client;
public UdpBioAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
public BioUdpAsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {

View File

@@ -22,13 +22,13 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
public class UdpBioProtocolServer extends ProtocolServer {
public class BioUdpProtocolServer extends ProtocolServer {
private boolean running;
private DatagramChannel serverChannel;
public UdpBioProtocolServer(Context context) {
public BioUdpProtocolServer(Context context) {
super(context);
}
@@ -93,7 +93,7 @@ public class UdpBioProtocolServer extends ProtocolServer {
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel,
AsyncConnection conn = new BioUdpAsyncConnection(bufferPool, bufferPool, serchannel,
context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null));
} catch (Exception e) {

View File

@@ -27,7 +27,7 @@ import org.redkale.net.nio.NioThreadGroup;
*
* @since 2.1.0
*/
public class TcpNioAsyncConnection extends AsyncConnection {
public class NioTcpAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -41,8 +41,6 @@ public class TcpNioAsyncConnection extends AsyncConnection {
final NioThreadGroup ioGroup;
final ExecutorService workExecutor;
//
private Object connectAttachment;
@@ -78,13 +76,11 @@ public class TcpNioAsyncConnection extends AsyncConnection {
private SelectionKey writeKey;
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor,
SocketChannel ch,
public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, SocketChannel ch,
SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) {
super(ioThread.getBufferPool(), sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup;
this.ioThread = ioThread;
this.workExecutor = workExecutor;
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
@@ -97,14 +93,13 @@ public class TcpNioAsyncConnection extends AsyncConnection {
this.remoteAddress = addr;
}
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor,
public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread,
Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
SocketChannel ch, SSLContext sslContext, final SocketAddress addr0,
AtomicLong livingCounter, AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup;
this.ioThread = ioThread;
this.workExecutor = workExecutor;
this.channel = ch;
SocketAddress addr = addr0;
if (addr == null) {
@@ -219,19 +214,11 @@ public class TcpNioAsyncConnection extends AsyncConnection {
public void read(CompletionHandler<Integer, ByteBuffer> handler) {
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), pollReadBuffer());
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), pollReadBuffer()));
}
handler.failed(new NotYetConnectedException(), pollReadBuffer());
return;
}
if (this.readPending) {
if (this.workExecutor == null) {
handler.failed(new ReadPendingException(), pollReadBuffer());
} else {
this.workExecutor.execute(() -> handler.failed(new ReadPendingException(), pollReadBuffer()));
}
handler.failed(new ReadPendingException(), pollReadBuffer());
return;
}
this.readPending = true;
@@ -255,19 +242,11 @@ public class TcpNioAsyncConnection extends AsyncConnection {
Objects.requireNonNull(src);
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment));
}
handler.failed(new NotYetConnectedException(), attachment);
return;
}
if (this.writePending) {
if (this.workExecutor == null) {
handler.failed(new WritePendingException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment));
}
handler.failed(new WritePendingException(), attachment);
return;
}
this.writePending = true;
@@ -288,19 +267,11 @@ public class TcpNioAsyncConnection extends AsyncConnection {
Objects.requireNonNull(srcs);
Objects.requireNonNull(handler);
if (!this.channel.isConnected()) {
if (this.workExecutor == null) {
handler.failed(new NotYetConnectedException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment));
}
handler.failed(new NotYetConnectedException(), attachment);
return;
}
if (this.writePending) {
if (this.workExecutor == null) {
handler.failed(new WritePendingException(), attachment);
} else {
this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment));
}
handler.failed(new WritePendingException(), attachment);
return;
}
this.writePending = true;
@@ -349,17 +320,9 @@ public class TcpNioAsyncConnection extends AsyncConnection {
clearConnect();
if (handler != null) {
if (t == null) {
if (this.workExecutor == null) {
handler.completed(null, attach);
} else {
this.workExecutor.execute(() -> handler.completed(null, attach));
}
handler.completed(null, attach);
} else {
if (this.workExecutor == null) {
handler.failed(t, attach);
} else {
this.workExecutor.execute(() -> handler.failed(t, attach));
}
handler.failed(t, attach);
}
}
}
@@ -416,17 +379,9 @@ public class TcpNioAsyncConnection extends AsyncConnection {
clearRead();
if (handler != null) {
if (t == null) {
if (this.workExecutor == null) {
handler.completed(totalCount, attach);
} else {
this.workExecutor.execute(() -> handler.completed(totalCount, attach));
}
handler.completed(totalCount, attach);
} else {
if (this.workExecutor == null) {
handler.failed(t, attach);
} else {
this.workExecutor.execute(() -> handler.failed(t, attach));
}
handler.failed(t, attach);
}
}
}
@@ -471,12 +426,7 @@ public class TcpNioAsyncConnection extends AsyncConnection {
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.completed(totalCount, attach);
} else {
final int totalCount0 = totalCount;
this.workExecutor.execute(() -> handler.completed(totalCount0, attach));
}
handler.completed(totalCount, attach);
}
} else if (writeKey == null) {
ioThread.register(selector -> {
@@ -488,11 +438,7 @@ public class TcpNioAsyncConnection extends AsyncConnection {
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
handler.failed(e, attach);
}
}
});
@@ -504,11 +450,7 @@ public class TcpNioAsyncConnection extends AsyncConnection {
Object attach = this.writeAttachment;
clearWrite();
if (handler != null) {
if (this.workExecutor == null) {
handler.failed(e, attach);
} else {
this.workExecutor.execute(() -> handler.failed(e, attach));
}
handler.failed(e, attach);
}
}
}

View File

@@ -0,0 +1,168 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.redkale.net.nio.NioThread;
import org.redkale.util.ObjectPool;
/**
*
* @author zhangjx
*/
public class NioTcpPrepareRunner implements Runnable {
private final AsyncConnection channel;
private final Context context;
private final ObjectPool<Response> responsePool;
private ByteBuffer data;
private Response response;
public NioTcpPrepareRunner(Context context, ObjectPool<Response> responsePool, AsyncConnection channel, ByteBuffer data, Response response) {
this.context = context;
this.responsePool = responsePool;
this.channel = channel;
this.data = data;
this.response = response;
}
@Override
public void run() {
try {
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer count, ByteBuffer buffer) {
if (response == null) response = ((NioThread) Thread.currentThread()).getResponsePool().get();
if (count < 1) {
buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
return;
}
// { //测试
// buffer.flip();
// byte[] bs = new byte[buffer.remaining()];
// buffer.get(bs);
// System.println(new String(bs));
// }
buffer.flip();
try {
response.init(channel);
codec(buffer, response);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
buffer.clear();
channel.setReadBuffer(buffer);
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
if (exc != null && context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc);
}
}
});
} catch (Exception te) {
channel.dispose();// response.init(channel); 在调用之前异常
response.removeChannel();
response.finish(true);
if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te);
}
}
}
protected void codec(final ByteBuffer buffer, final Response response) throws IOException {
final Request request = response.request;
final PrepareServlet preparer = context.prepare;
preparer.executeCounter.incrementAndGet();
final int rs = request.readHeader(buffer);
if (rs < 0) { //表示数据格式不正确
channel.offerBuffer(buffer);
if (rs != Integer.MIN_VALUE) preparer.illRequestCounter.incrementAndGet();
response.finish(true);
} else if (rs == 0) {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
buffer.clear();
channel.setReadBuffer(buffer);
}
preparer.prepare(request, response);
} else {
buffer.clear();
channel.setReadBuffer(buffer);
final AtomicInteger ai = new AtomicInteger(rs);
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
ai.addAndGet(-request.readBody(attachment));
if (ai.get() > 0) {
attachment.clear();
channel.setReadBuffer(attachment);
channel.read(this);
} else {
if (attachment.hasRemaining()) {
request.setMoredata(attachment);
} else {
attachment.clear();
channel.setReadBuffer(attachment);
}
try {
preparer.prepare(request, response);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免preparer.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
preparer.illRequestCounter.incrementAndGet();
attachment.clear();
channel.setReadBuffer(attachment);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc);
}
});
}
}
protected void initResponse(Response response, AsyncConnection channel) {
response.init(channel);
}
protected Response pollResponse() {
return responsePool.get();
}
protected Request pollRequest(Response response) {
return response.request;
}
protected AsyncConnection removeChannel(Response response) {
return response.removeChannel();
}
}

View File

@@ -11,7 +11,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.*;
import org.redkale.net.nio.*;
import org.redkale.util.*;
@@ -24,7 +23,7 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
public class TcpNioProtocolServer extends ProtocolServer {
public class NioTcpProtocolServer extends ProtocolServer {
private ObjectPool<ByteBuffer> bufferPool;
@@ -40,7 +39,7 @@ public class TcpNioProtocolServer extends ProtocolServer {
private boolean closed;
public TcpNioProtocolServer(Context context) {
public NioTcpProtocolServer(Context context) {
super(context);
}
@@ -91,10 +90,10 @@ public class TcpNioProtocolServer extends ProtocolServer {
this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), bufferPool);
this.ioGroup = new NioThreadGroup(server.name, null, Runtime.getRuntime().availableProcessors(), bufferPool, responsePool);
this.ioGroup.start();
this.acceptThread = new Thread() {
@@ -129,8 +128,8 @@ public class TcpNioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
NioThread ioThread = ioGroup.nextThread();
AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, channel, context.getSSLContext(), null, livingCounter, closedCounter);
new PrepareRunner(context, responsePool, conn, null, null).run();
AsyncConnection conn = new NioTcpAsyncConnection(ioGroup, ioThread, channel, context.getSSLContext(), null, livingCounter, closedCounter);
new NioTcpPrepareRunner(context, responsePool, conn, null, null).run();
}
@Override

View File

@@ -69,11 +69,11 @@ public abstract class ProtocolServer {
if (netimpl != null) netimpl = netimpl.trim();
if ("TCP".equalsIgnoreCase(protocol)) {
if (netimpl == null || netimpl.isEmpty()) {
return new TcpAioProtocolServer(context);
return new AioTcpProtocolServer(context);
} else if ("aio".equalsIgnoreCase(netimpl)) {
return new TcpAioProtocolServer(context);
return new AioTcpProtocolServer(context);
} else if ("nio".equalsIgnoreCase(netimpl)) {
return new TcpNioProtocolServer(context);
return new NioTcpProtocolServer(context);
}
} else if ("UDP".equalsIgnoreCase(protocol)) {
if (netimpl == null || netimpl.isEmpty()) {

View File

@@ -152,8 +152,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
final Format f = createFormat();
final String n = name;
this.workExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new WorkThread(workExecutor, r);
t.setName("Redkale-" + n + "-ServletThread-" + f.format(counter.incrementAndGet()));
String threadname = "Redkale-" + n + "-WorkThread-" + f.format(counter.incrementAndGet());
Thread t = new WorkThread(threadname, workExecutor, r);
return t;
});
}

View File

@@ -19,20 +19,21 @@ public class WorkThread extends Thread {
protected Thread localThread;
private final ExecutorService executor;
protected final ExecutorService workExecutor;
public WorkThread(ExecutorService executor, Runnable runner) {
super(runner);
this.executor = executor;
public WorkThread(String name, ExecutorService workExecutor, Runnable target) {
super(target);
if (name != null) setName(name);
this.workExecutor = workExecutor;
this.setDaemon(true);
}
public void runAsync(Runnable runner) {
executor.execute(runner);
workExecutor.execute(runner);
}
public ExecutorService getExecutor() {
return executor;
public ExecutorService getWorkExecutor() {
return workExecutor;
}
@Override
@@ -41,11 +42,11 @@ public class WorkThread extends Thread {
super.run();
}
public boolean inSameThread() {
public boolean inCurrThread() {
return this.localThread == Thread.currentThread();
}
public boolean inSameThread(Thread thread) {
public boolean inCurrThread(Thread thread) {
return this.localThread == thread;
}

View File

@@ -10,7 +10,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.redkale.net.TcpNioAsyncConnection;
import org.redkale.net.*;
import org.redkale.util.*;
/**
@@ -23,22 +23,24 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
public class NioThread extends Thread {
public class NioThread extends WorkThread {
final Selector selector;
private final ObjectPool<ByteBuffer> bufferPool;
private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>();
private final ObjectPool<Response> responsePool;
private Thread localThread;
private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue<>();
private boolean closed;
public NioThread(Selector selector, ObjectPool<ByteBuffer> bufferPool) {
super();
public NioThread(String name, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
super(name, workExecutor, null);
this.selector = selector;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.setDaemon(true);
}
@@ -51,6 +53,10 @@ public class NioThread extends Thread {
return bufferPool;
}
public ObjectPool<Response> getResponsePool() {
return responsePool;
}
@Override
public void run() {
this.localThread = Thread.currentThread();
@@ -68,7 +74,7 @@ public class NioThread extends Thread {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()) continue;
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
NioTcpAsyncConnection conn = (NioTcpAsyncConnection) key.attachment();
if (key.isWritable()) {
//key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite();
@@ -84,14 +90,6 @@ public class NioThread extends Thread {
}
}
public boolean inCurrThread() {
return this.localThread == Thread.currentThread();
}
public boolean inSameThread(Thread thread) {
return this.localThread == thread;
}
public void close() {
this.closed = true;
this.interrupt();

View File

@@ -10,6 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.net.Response;
import org.redkale.util.ObjectPool;
/**
@@ -30,17 +31,23 @@ public class NioThreadGroup {
private ScheduledThreadPoolExecutor timeoutExecutor;
public NioThreadGroup(int threads, ObjectPool<ByteBuffer> bufferPool) throws IOException {
this.threads = new NioThread[Math.max(threads, 1)];
public NioThreadGroup(final String serverName, ExecutorService workExecutor, int iothreads,
ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) throws IOException {
this.threads = new NioThread[Math.max(iothreads, 1)];
for (int i = 0; i < this.threads.length; i++) {
ObjectPool<ByteBuffer> threadBufferPool = ObjectPool.createUnsafePool(bufferPool.getCreatCounter(),
bufferPool.getCycleCounter(), 8,
bufferPool.getCreator(), bufferPool.getPrepare(), bufferPool.getRecycler());
this.threads[i] = new NioThread(Selector.open(), threadBufferPool);
ObjectPool<Response> threadResponsePool = ObjectPool.createUnsafePool(responsePool.getCreatCounter(),
responsePool.getCycleCounter(), 8,
responsePool.getCreator(), responsePool.getPrepare(), responsePool.getRecycler());
String name = "Redkale-" + serverName + "-ServletThread" + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
this.threads[i] = new NioThread(name, workExecutor, Selector.open(), threadBufferPool, threadResponsePool);
}
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r);
t.setName(this.getClass().getSimpleName() + "-Timeout-Thread");
t.setName("Redkale-" + serverName + "-IOTimeoutThread");
t.setDaemon(true);
return t;
});

View File

@@ -49,7 +49,7 @@ public abstract class SncpServlet extends Servlet<SncpContext, SncpRequest, Sncp
protected ExecutorService getExecutor() {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
return ((WorkThread) thread).getExecutor();
return ((WorkThread) thread).getWorkExecutor();
}
return ForkJoinPool.commonPool();
}

View File

@@ -17,11 +17,11 @@ public abstract class AbstractService implements Service {
//如果开启了SNCP此处线程池为SncpServer的线程池
@Resource(name = Server.RESNAME_SERVER_EXECUTOR)
private ExecutorService serverExecutor;
private ExecutorService serverWorkExecutor;
protected void runAsync(Runnable runner) {
if (serverExecutor != null) {
serverExecutor.execute(runner);
if (serverWorkExecutor != null) {
serverWorkExecutor.execute(runner);
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
@@ -33,10 +33,10 @@ public abstract class AbstractService implements Service {
}
protected ExecutorService getExecutor() {
if (serverExecutor != null) return serverExecutor;
if (serverWorkExecutor != null) return serverWorkExecutor;
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
return ((WorkThread) thread).getExecutor();
return ((WorkThread) thread).getWorkExecutor();
}
return ForkJoinPool.commonPool();
}

View File

@@ -11,7 +11,6 @@ import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import org.redkale.net.WorkThread;
import org.redkale.net.http.*;
import org.redkale.util.*;
@@ -44,19 +43,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(wsaddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
ExecutorService executor = null;
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
executor = ((WorkThread) thread).getExecutor();
}
if (executor == null) executor = ForkJoinPool.commonPool();
return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return rs;
}, executor);
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return CompletableFuture.completedFuture(rs);
}
@Override