From 96fd660d469b7efd10d9279a6601183ca9cf73d7 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 20 Jan 2021 10:14:52 +0800 Subject: [PATCH] --- ...ection.java => AioTcpAsyncConnection.java} | 4 +- ...lServer.java => AioTcpProtocolServer.java} | 6 +- src/org/redkale/net/AsyncConnection.java | 18 +- ...ection.java => BioUdpAsyncConnection.java} | 4 +- ...lServer.java => BioUdpProtocolServer.java} | 6 +- ...ection.java => NioTcpAsyncConnection.java} | 90 ++-------- src/org/redkale/net/NioTcpPrepareRunner.java | 168 ++++++++++++++++++ ...lServer.java => NioTcpProtocolServer.java} | 13 +- src/org/redkale/net/ProtocolServer.java | 6 +- src/org/redkale/net/Server.java | 4 +- src/org/redkale/net/WorkThread.java | 19 +- src/org/redkale/net/nio/NioThread.java | 28 ++- src/org/redkale/net/nio/NioThreadGroup.java | 15 +- src/org/redkale/net/sncp/SncpServlet.java | 2 +- src/org/redkale/service/AbstractService.java | 10 +- .../redkale/service/WebSocketNodeService.java | 17 +- 16 files changed, 257 insertions(+), 153 deletions(-) rename src/org/redkale/net/{TcpAioAsyncConnection.java => AioTcpAsyncConnection.java} (96%) rename src/org/redkale/net/{TcpAioProtocolServer.java => AioTcpProtocolServer.java} (94%) rename src/org/redkale/net/{UdpBioAsyncConnection.java => BioUdpAsyncConnection.java} (94%) rename src/org/redkale/net/{UdpBioProtocolServer.java => BioUdpProtocolServer.java} (93%) rename src/org/redkale/net/{TcpNioAsyncConnection.java => NioTcpAsyncConnection.java} (77%) create mode 100644 src/org/redkale/net/NioTcpPrepareRunner.java rename src/org/redkale/net/{TcpNioProtocolServer.java => NioTcpProtocolServer.java} (88%) diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/AioTcpAsyncConnection.java similarity index 96% rename from src/org/redkale/net/TcpAioAsyncConnection.java rename to src/org/redkale/net/AioTcpAsyncConnection.java index 608522751..12e73eab8 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/AioTcpAsyncConnection.java @@ -22,7 +22,7 @@ import javax.net.ssl.SSLContext; * * @author zhangjx */ -class TcpAioAsyncConnection extends AsyncConnection { +class AioTcpAsyncConnection extends AsyncConnection { //private final Semaphore semaphore = new Semaphore(1); private int readTimeoutSeconds; @@ -35,7 +35,7 @@ class TcpAioAsyncConnection extends AsyncConnection { private BlockingQueue writeQueue; - public TcpAioAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, + public AioTcpAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { diff --git a/src/org/redkale/net/TcpAioProtocolServer.java b/src/org/redkale/net/AioTcpProtocolServer.java similarity index 94% rename from src/org/redkale/net/TcpAioProtocolServer.java rename to src/org/redkale/net/AioTcpProtocolServer.java index c32f2c08e..2d5795d1b 100644 --- a/src/org/redkale/net/TcpAioProtocolServer.java +++ b/src/org/redkale/net/AioTcpProtocolServer.java @@ -22,13 +22,13 @@ import org.redkale.util.*; * * @author zhangjx */ -public class TcpAioProtocolServer extends ProtocolServer { +public class AioTcpProtocolServer extends ProtocolServer { private AsynchronousChannelGroup group; private AsynchronousServerSocketChannel serverChannel; - public TcpAioProtocolServer(Context context) { + public AioTcpProtocolServer(Context context) { super(context); } @@ -102,7 +102,7 @@ public class TcpAioProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel, + AsyncConnection conn = new AioTcpAsyncConnection(bufferPool, bufferPool, channel, context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); //context.runAsync(new PrepareRunner(context, responsePool, conn, null, null)); new PrepareRunner(context, responsePool, conn, null, null).run(); diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 89e16561a..b44696d3b 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -298,7 +298,7 @@ public abstract class AsyncConnection implements AutoCloseable { channel.connect(address, null, new CompletionHandler() { @Override public void completed(Void result, Void attachment) { - future.complete(new TcpAioAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null)); + future.complete(new AioTcpAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null)); } @Override @@ -341,27 +341,27 @@ public abstract class AsyncConnection implements AutoCloseable { public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SSLContext sslContext, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SSLContext sslContext, SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new BioUdpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch) { @@ -370,22 +370,22 @@ public abstract class AsyncConnection implements AutoCloseable { public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new AioTcpAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } } diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/BioUdpAsyncConnection.java similarity index 94% rename from src/org/redkale/net/UdpBioAsyncConnection.java rename to src/org/redkale/net/BioUdpAsyncConnection.java index 65ee60bab..a19eba5ee 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/BioUdpAsyncConnection.java @@ -21,7 +21,7 @@ import javax.net.ssl.SSLContext; * * @author zhangjx */ -class UdpBioAsyncConnection extends AsyncConnection { +class BioUdpAsyncConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -33,7 +33,7 @@ class UdpBioAsyncConnection extends AsyncConnection { private final boolean client; - public UdpBioAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, + public BioUdpAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { diff --git a/src/org/redkale/net/UdpBioProtocolServer.java b/src/org/redkale/net/BioUdpProtocolServer.java similarity index 93% rename from src/org/redkale/net/UdpBioProtocolServer.java rename to src/org/redkale/net/BioUdpProtocolServer.java index 4eed02d85..0191eedb5 100644 --- a/src/org/redkale/net/UdpBioProtocolServer.java +++ b/src/org/redkale/net/BioUdpProtocolServer.java @@ -22,13 +22,13 @@ import org.redkale.util.*; * * @author zhangjx */ -public class UdpBioProtocolServer extends ProtocolServer { +public class BioUdpProtocolServer extends ProtocolServer { private boolean running; private DatagramChannel serverChannel; - public UdpBioProtocolServer(Context context) { + public BioUdpProtocolServer(Context context) { super(context); } @@ -93,7 +93,7 @@ public class UdpBioProtocolServer extends ProtocolServer { try { SocketAddress address = serchannel.receive(buffer); buffer.flip(); - AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel, + AsyncConnection conn = new BioUdpAsyncConnection(bufferPool, bufferPool, serchannel, context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null)); } catch (Exception e) { diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/NioTcpAsyncConnection.java similarity index 77% rename from src/org/redkale/net/TcpNioAsyncConnection.java rename to src/org/redkale/net/NioTcpAsyncConnection.java index aa48112e0..7130c7bad 100644 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/NioTcpAsyncConnection.java @@ -27,7 +27,7 @@ import org.redkale.net.nio.NioThreadGroup; * * @since 2.1.0 */ -public class TcpNioAsyncConnection extends AsyncConnection { +public class NioTcpAsyncConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -41,8 +41,6 @@ public class TcpNioAsyncConnection extends AsyncConnection { final NioThreadGroup ioGroup; - final ExecutorService workExecutor; - //连 private Object connectAttachment; @@ -78,13 +76,11 @@ public class TcpNioAsyncConnection extends AsyncConnection { private SelectionKey writeKey; - public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, - SocketChannel ch, + public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, SocketChannel ch, SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { super(ioThread.getBufferPool(), sslContext, livingCounter, closedCounter); this.ioGroup = ioGroup; this.ioThread = ioThread; - this.workExecutor = workExecutor; this.channel = ch; SocketAddress addr = addr0; if (addr == null) { @@ -97,14 +93,13 @@ public class TcpNioAsyncConnection extends AsyncConnection { this.remoteAddress = addr; } - public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, + public NioTcpAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, Supplier bufferSupplier, Consumer bufferConsumer, SocketChannel ch, SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter); this.ioGroup = ioGroup; this.ioThread = ioThread; - this.workExecutor = workExecutor; this.channel = ch; SocketAddress addr = addr0; if (addr == null) { @@ -219,19 +214,11 @@ public class TcpNioAsyncConnection extends AsyncConnection { public void read(CompletionHandler handler) { Objects.requireNonNull(handler); if (!this.channel.isConnected()) { - if (this.workExecutor == null) { - handler.failed(new NotYetConnectedException(), pollReadBuffer()); - } else { - this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), pollReadBuffer())); - } + handler.failed(new NotYetConnectedException(), pollReadBuffer()); return; } if (this.readPending) { - if (this.workExecutor == null) { - handler.failed(new ReadPendingException(), pollReadBuffer()); - } else { - this.workExecutor.execute(() -> handler.failed(new ReadPendingException(), pollReadBuffer())); - } + handler.failed(new ReadPendingException(), pollReadBuffer()); return; } this.readPending = true; @@ -255,19 +242,11 @@ public class TcpNioAsyncConnection extends AsyncConnection { Objects.requireNonNull(src); Objects.requireNonNull(handler); if (!this.channel.isConnected()) { - if (this.workExecutor == null) { - handler.failed(new NotYetConnectedException(), attachment); - } else { - this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment)); - } + handler.failed(new NotYetConnectedException(), attachment); return; } if (this.writePending) { - if (this.workExecutor == null) { - handler.failed(new WritePendingException(), attachment); - } else { - this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment)); - } + handler.failed(new WritePendingException(), attachment); return; } this.writePending = true; @@ -288,19 +267,11 @@ public class TcpNioAsyncConnection extends AsyncConnection { Objects.requireNonNull(srcs); Objects.requireNonNull(handler); if (!this.channel.isConnected()) { - if (this.workExecutor == null) { - handler.failed(new NotYetConnectedException(), attachment); - } else { - this.workExecutor.execute(() -> handler.failed(new NotYetConnectedException(), attachment)); - } + handler.failed(new NotYetConnectedException(), attachment); return; } if (this.writePending) { - if (this.workExecutor == null) { - handler.failed(new WritePendingException(), attachment); - } else { - this.workExecutor.execute(() -> handler.failed(new WritePendingException(), attachment)); - } + handler.failed(new WritePendingException(), attachment); return; } this.writePending = true; @@ -349,17 +320,9 @@ public class TcpNioAsyncConnection extends AsyncConnection { clearConnect(); if (handler != null) { if (t == null) { - if (this.workExecutor == null) { - handler.completed(null, attach); - } else { - this.workExecutor.execute(() -> handler.completed(null, attach)); - } + handler.completed(null, attach); } else { - if (this.workExecutor == null) { - handler.failed(t, attach); - } else { - this.workExecutor.execute(() -> handler.failed(t, attach)); - } + handler.failed(t, attach); } } } @@ -416,17 +379,9 @@ public class TcpNioAsyncConnection extends AsyncConnection { clearRead(); if (handler != null) { if (t == null) { - if (this.workExecutor == null) { - handler.completed(totalCount, attach); - } else { - this.workExecutor.execute(() -> handler.completed(totalCount, attach)); - } + handler.completed(totalCount, attach); } else { - if (this.workExecutor == null) { - handler.failed(t, attach); - } else { - this.workExecutor.execute(() -> handler.failed(t, attach)); - } + handler.failed(t, attach); } } } @@ -471,12 +426,7 @@ public class TcpNioAsyncConnection extends AsyncConnection { Object attach = this.writeAttachment; clearWrite(); if (handler != null) { - if (this.workExecutor == null) { - handler.completed(totalCount, attach); - } else { - final int totalCount0 = totalCount; - this.workExecutor.execute(() -> handler.completed(totalCount0, attach)); - } + handler.completed(totalCount, attach); } } else if (writeKey == null) { ioThread.register(selector -> { @@ -488,11 +438,7 @@ public class TcpNioAsyncConnection extends AsyncConnection { Object attach = this.writeAttachment; clearWrite(); if (handler != null) { - if (this.workExecutor == null) { - handler.failed(e, attach); - } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); - } + handler.failed(e, attach); } } }); @@ -504,11 +450,7 @@ public class TcpNioAsyncConnection extends AsyncConnection { Object attach = this.writeAttachment; clearWrite(); if (handler != null) { - if (this.workExecutor == null) { - handler.failed(e, attach); - } else { - this.workExecutor.execute(() -> handler.failed(e, attach)); - } + handler.failed(e, attach); } } } diff --git a/src/org/redkale/net/NioTcpPrepareRunner.java b/src/org/redkale/net/NioTcpPrepareRunner.java new file mode 100644 index 000000000..807fdffcb --- /dev/null +++ b/src/org/redkale/net/NioTcpPrepareRunner.java @@ -0,0 +1,168 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import org.redkale.net.nio.NioThread; +import org.redkale.util.ObjectPool; + +/** + * + * @author zhangjx + */ +public class NioTcpPrepareRunner implements Runnable { + + private final AsyncConnection channel; + + private final Context context; + + private final ObjectPool responsePool; + + private ByteBuffer data; + + private Response response; + + public NioTcpPrepareRunner(Context context, ObjectPool responsePool, AsyncConnection channel, ByteBuffer data, Response response) { + this.context = context; + this.responsePool = responsePool; + this.channel = channel; + this.data = data; + this.response = response; + } + + @Override + public void run() { + try { + channel.read(new CompletionHandler() { + @Override + public void completed(Integer count, ByteBuffer buffer) { + if (response == null) response = ((NioThread) Thread.currentThread()).getResponsePool().get(); + if (count < 1) { + buffer.clear(); + channel.setReadBuffer(buffer); + channel.dispose();// response.init(channel); 在调用之前异常 + response.removeChannel(); + response.finish(true); + return; + } +// { //测试 +// buffer.flip(); +// byte[] bs = new byte[buffer.remaining()]; +// buffer.get(bs); +// System.println(new String(bs)); +// } + buffer.flip(); + try { + response.init(channel); + codec(buffer, response); + } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer + context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + response.finish(true); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + buffer.clear(); + channel.setReadBuffer(buffer); + channel.dispose();// response.init(channel); 在调用之前异常 + response.removeChannel(); + response.finish(true); + if (exc != null && context.logger.isLoggable(Level.FINEST)) { + context.logger.log(Level.FINEST, "Servlet Handler read channel erroneous, force to close channel ", exc); + } + } + }); + } catch (Exception te) { + channel.dispose();// response.init(channel); 在调用之前异常 + response.removeChannel(); + response.finish(true); + if (context.logger.isLoggable(Level.FINEST)) { + context.logger.log(Level.FINEST, "Servlet read channel erroneous, force to close channel ", te); + } + } + } + + protected void codec(final ByteBuffer buffer, final Response response) throws IOException { + final Request request = response.request; + final PrepareServlet preparer = context.prepare; + preparer.executeCounter.incrementAndGet(); + final int rs = request.readHeader(buffer); + if (rs < 0) { //表示数据格式不正确 + channel.offerBuffer(buffer); + if (rs != Integer.MIN_VALUE) preparer.illRequestCounter.incrementAndGet(); + response.finish(true); + } else if (rs == 0) { + if (buffer.hasRemaining()) { + request.setMoredata(buffer); + } else { + buffer.clear(); + channel.setReadBuffer(buffer); + } + preparer.prepare(request, response); + } else { + buffer.clear(); + channel.setReadBuffer(buffer); + final AtomicInteger ai = new AtomicInteger(rs); + channel.read(new CompletionHandler() { + + @Override + public void completed(Integer result, ByteBuffer attachment) { + attachment.flip(); + ai.addAndGet(-request.readBody(attachment)); + if (ai.get() > 0) { + attachment.clear(); + channel.setReadBuffer(attachment); + channel.read(this); + } else { + if (attachment.hasRemaining()) { + request.setMoredata(attachment); + } else { + attachment.clear(); + channel.setReadBuffer(attachment); + } + try { + preparer.prepare(request, response); + } catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免preparer.prepare内部异常导致重复 offerBuffer + context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t); + response.finish(true); + } + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + preparer.illRequestCounter.incrementAndGet(); + attachment.clear(); + channel.setReadBuffer(attachment); + response.finish(true); + if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, force to close channel ", exc); + } + }); + } + } + + protected void initResponse(Response response, AsyncConnection channel) { + response.init(channel); + } + + protected Response pollResponse() { + return responsePool.get(); + } + + protected Request pollRequest(Response response) { + return response.request; + } + + protected AsyncConnection removeChannel(Response response) { + return response.removeChannel(); + } + +} diff --git a/src/org/redkale/net/TcpNioProtocolServer.java b/src/org/redkale/net/NioTcpProtocolServer.java similarity index 88% rename from src/org/redkale/net/TcpNioProtocolServer.java rename to src/org/redkale/net/NioTcpProtocolServer.java index efec5fc2d..73a1cafc9 100644 --- a/src/org/redkale/net/TcpNioProtocolServer.java +++ b/src/org/redkale/net/NioTcpProtocolServer.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.AtomicLong; -import org.redkale.net.*; import org.redkale.net.nio.*; import org.redkale.util.*; @@ -24,7 +23,7 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public class TcpNioProtocolServer extends ProtocolServer { +public class NioTcpProtocolServer extends ProtocolServer { private ObjectPool bufferPool; @@ -40,7 +39,7 @@ public class TcpNioProtocolServer extends ProtocolServer { private boolean closed; - public TcpNioProtocolServer(Context context) { + public NioTcpProtocolServer(Context context) { super(context); } @@ -91,10 +90,10 @@ public class TcpNioProtocolServer extends ProtocolServer { this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); AtomicLong createResponseCounter = new AtomicLong(); AtomicLong cycleResponseCounter = new AtomicLong(); + this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); - - this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), bufferPool); + this.ioGroup = new NioThreadGroup(server.name, null, Runtime.getRuntime().availableProcessors(), bufferPool, responsePool); this.ioGroup.start(); this.acceptThread = new Thread() { @@ -129,8 +128,8 @@ public class TcpNioProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); NioThread ioThread = ioGroup.nextThread(); - AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, channel, context.getSSLContext(), null, livingCounter, closedCounter); - new PrepareRunner(context, responsePool, conn, null, null).run(); + AsyncConnection conn = new NioTcpAsyncConnection(ioGroup, ioThread, channel, context.getSSLContext(), null, livingCounter, closedCounter); + new NioTcpPrepareRunner(context, responsePool, conn, null, null).run(); } @Override diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index b8c5b9f53..031fff77e 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -69,11 +69,11 @@ public abstract class ProtocolServer { if (netimpl != null) netimpl = netimpl.trim(); if ("TCP".equalsIgnoreCase(protocol)) { if (netimpl == null || netimpl.isEmpty()) { - return new TcpAioProtocolServer(context); + return new AioTcpProtocolServer(context); } else if ("aio".equalsIgnoreCase(netimpl)) { - return new TcpAioProtocolServer(context); + return new AioTcpProtocolServer(context); } else if ("nio".equalsIgnoreCase(netimpl)) { - return new TcpNioProtocolServer(context); + return new NioTcpProtocolServer(context); } } else if ("UDP".equalsIgnoreCase(protocol)) { if (netimpl == null || netimpl.isEmpty()) { diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index fb10d660a..23277af61 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -152,8 +152,8 @@ public abstract class Server { - Thread t = new WorkThread(workExecutor, r); - t.setName("Redkale-" + n + "-ServletThread-" + f.format(counter.incrementAndGet())); + String threadname = "Redkale-" + n + "-WorkThread-" + f.format(counter.incrementAndGet()); + Thread t = new WorkThread(threadname, workExecutor, r); return t; }); } diff --git a/src/org/redkale/net/WorkThread.java b/src/org/redkale/net/WorkThread.java index 9781a24b4..f3309e3c4 100644 --- a/src/org/redkale/net/WorkThread.java +++ b/src/org/redkale/net/WorkThread.java @@ -19,20 +19,21 @@ public class WorkThread extends Thread { protected Thread localThread; - private final ExecutorService executor; + protected final ExecutorService workExecutor; - public WorkThread(ExecutorService executor, Runnable runner) { - super(runner); - this.executor = executor; + public WorkThread(String name, ExecutorService workExecutor, Runnable target) { + super(target); + if (name != null) setName(name); + this.workExecutor = workExecutor; this.setDaemon(true); } public void runAsync(Runnable runner) { - executor.execute(runner); + workExecutor.execute(runner); } - public ExecutorService getExecutor() { - return executor; + public ExecutorService getWorkExecutor() { + return workExecutor; } @Override @@ -41,11 +42,11 @@ public class WorkThread extends Thread { super.run(); } - public boolean inSameThread() { + public boolean inCurrThread() { return this.localThread == Thread.currentThread(); } - public boolean inSameThread(Thread thread) { + public boolean inCurrThread(Thread thread) { return this.localThread == thread; } diff --git a/src/org/redkale/net/nio/NioThread.java b/src/org/redkale/net/nio/NioThread.java index a2eae695b..59535033b 100644 --- a/src/org/redkale/net/nio/NioThread.java +++ b/src/org/redkale/net/nio/NioThread.java @@ -10,7 +10,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; -import org.redkale.net.TcpNioAsyncConnection; +import org.redkale.net.*; import org.redkale.util.*; /** @@ -23,22 +23,24 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public class NioThread extends Thread { +public class NioThread extends WorkThread { final Selector selector; private final ObjectPool bufferPool; - private final ConcurrentLinkedQueue> registers = new ConcurrentLinkedQueue<>(); + private final ObjectPool responsePool; - private Thread localThread; + private final ConcurrentLinkedQueue> registers = new ConcurrentLinkedQueue<>(); private boolean closed; - public NioThread(Selector selector, ObjectPool bufferPool) { - super(); + public NioThread(String name, ExecutorService workExecutor, Selector selector, + ObjectPool bufferPool, ObjectPool responsePool) { + super(name, workExecutor, null); this.selector = selector; this.bufferPool = bufferPool; + this.responsePool = responsePool; this.setDaemon(true); } @@ -51,6 +53,10 @@ public class NioThread extends Thread { return bufferPool; } + public ObjectPool getResponsePool() { + return responsePool; + } + @Override public void run() { this.localThread = Thread.currentThread(); @@ -68,7 +74,7 @@ public class NioThread extends Thread { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) continue; - TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); + NioTcpAsyncConnection conn = (NioTcpAsyncConnection) key.attachment(); if (key.isWritable()) { //key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); conn.doWrite(); @@ -84,14 +90,6 @@ public class NioThread extends Thread { } } - public boolean inCurrThread() { - return this.localThread == Thread.currentThread(); - } - - public boolean inSameThread(Thread thread) { - return this.localThread == thread; - } - public void close() { this.closed = true; this.interrupt(); diff --git a/src/org/redkale/net/nio/NioThreadGroup.java b/src/org/redkale/net/nio/NioThreadGroup.java index f29dbd870..b4f3626e4 100644 --- a/src/org/redkale/net/nio/NioThreadGroup.java +++ b/src/org/redkale/net/nio/NioThreadGroup.java @@ -10,6 +10,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import org.redkale.net.Response; import org.redkale.util.ObjectPool; /** @@ -30,17 +31,23 @@ public class NioThreadGroup { private ScheduledThreadPoolExecutor timeoutExecutor; - public NioThreadGroup(int threads, ObjectPool bufferPool) throws IOException { - this.threads = new NioThread[Math.max(threads, 1)]; + public NioThreadGroup(final String serverName, ExecutorService workExecutor, int iothreads, + ObjectPool bufferPool, ObjectPool responsePool) throws IOException { + this.threads = new NioThread[Math.max(iothreads, 1)]; for (int i = 0; i < this.threads.length; i++) { ObjectPool threadBufferPool = ObjectPool.createUnsafePool(bufferPool.getCreatCounter(), bufferPool.getCycleCounter(), 8, bufferPool.getCreator(), bufferPool.getPrepare(), bufferPool.getRecycler()); - this.threads[i] = new NioThread(Selector.open(), threadBufferPool); + + ObjectPool threadResponsePool = ObjectPool.createUnsafePool(responsePool.getCreatCounter(), + responsePool.getCycleCounter(), 8, + responsePool.getCreator(), responsePool.getPrepare(), responsePool.getRecycler()); + String name = "Redkale-" + serverName + "-ServletThread" + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); + this.threads[i] = new NioThread(name, workExecutor, Selector.open(), threadBufferPool, threadResponsePool); } this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { Thread t = new Thread(r); - t.setName(this.getClass().getSimpleName() + "-Timeout-Thread"); + t.setName("Redkale-" + serverName + "-IOTimeoutThread"); t.setDaemon(true); return t; }); diff --git a/src/org/redkale/net/sncp/SncpServlet.java b/src/org/redkale/net/sncp/SncpServlet.java index e9fdfb9fc..41e9c2283 100644 --- a/src/org/redkale/net/sncp/SncpServlet.java +++ b/src/org/redkale/net/sncp/SncpServlet.java @@ -49,7 +49,7 @@ public abstract class SncpServlet extends Servlet> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { if ((topic == null || !topic.equals(wsaddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); - - ExecutorService executor = null; - Thread thread = Thread.currentThread(); - if (thread instanceof WorkThread) { - executor = ((WorkThread) thread).getExecutor(); - } - if (executor == null) executor = ForkJoinPool.commonPool(); - - return CompletableFuture.supplyAsync(() -> { - final List rs = new ArrayList<>(); - this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); - return rs; - }, executor); + final List rs = new ArrayList<>(); + this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); + return CompletableFuture.completedFuture(rs); } @Override