From 25eaf6e35386dbbb6a8fbe2cc85e82937b5b73a8 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 18 Dec 2018 14:04:49 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E6=8E=A5=E5=8F=A3=E5=A4=A7?= =?UTF-8?q?=E5=8F=98=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/AsyncConnection.java | 257 ++++++++---- src/org/redkale/net/PrepareRunner.java | 29 +- src/org/redkale/net/PrepareServlet.java | 27 +- src/org/redkale/net/ProtocolServer.java | 6 +- src/org/redkale/net/Request.java | 20 - .../redkale/net/TcpAioAsyncConnection.java | 36 +- src/org/redkale/net/TcpAioProtocolServer.java | 7 +- .../redkale/net/TcpBioAsyncConnection.java | 240 ------------ .../redkale/net/TcpNioAsyncConnection.java | 366 ----------------- src/org/redkale/net/TcpNioProtocolServer.java | 370 ------------------ src/org/redkale/net/Transport.java | 141 +++---- src/org/redkale/net/TransportFactory.java | 23 +- .../redkale/net/UdpBioAsyncConnection.java | 43 +- src/org/redkale/net/UdpBioProtocolServer.java | 3 +- src/org/redkale/net/http/WebSocketRunner.java | 42 +- src/org/redkale/net/sncp/SncpClient.java | 27 +- src/org/redkale/source/PoolTcpSource.java | 24 +- 17 files changed, 388 insertions(+), 1273 deletions(-) delete mode 100644 src/org/redkale/net/TcpBioAsyncConnection.java delete mode 100644 src/org/redkale/net/TcpNioAsyncConnection.java delete mode 100644 src/org/redkale/net/TcpNioProtocolServer.java diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index f395ad474..1f9ddda2e 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -12,8 +12,9 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.function.Consumer; +import java.util.function.*; import javax.net.ssl.SSLContext; +import org.redkale.util.ObjectPool; /** * @@ -22,7 +23,7 @@ import javax.net.ssl.SSLContext; * * @author zhangjx */ -public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCloseable { +public abstract class AsyncConnection implements ReadableByteChannel, WritableByteChannel, AutoCloseable { protected SSLContext sslContext; @@ -34,6 +35,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl protected volatile long writetime; + protected final Supplier bufferSupplier; + + protected final Consumer bufferConsumer; + + protected ByteBuffer readBuffer; + //在线数 protected AtomicLong livingCounter; @@ -45,6 +52,22 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl //关联的事件数, 小于1表示没有事件 protected final AtomicInteger eventing = new AtomicInteger(); + protected AsyncConnection(Context context) { + this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext()); + } + + protected AsyncConnection(ObjectPool bufferPool, SSLContext sslContext) { + this(bufferPool, bufferPool, sslContext); + } + + protected AsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, SSLContext sslContext) { + Objects.requireNonNull(bufferSupplier); + Objects.requireNonNull(bufferConsumer); + this.bufferSupplier = bufferSupplier; + this.bufferConsumer = bufferConsumer; + this.sslContext = sslContext; + } + public final long getLastReadTime() { return readtime; } @@ -61,6 +84,9 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return eventing.decrementAndGet(); } + @Override + public abstract boolean isOpen(); + public abstract boolean isTCP(); public abstract boolean shutdownInput(); @@ -84,17 +110,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); @Override - public abstract Future read(ByteBuffer dst); + public abstract int read(ByteBuffer dst) throws IOException; + + public abstract void read(CompletionHandler handler); + + public abstract void read(long timeout, TimeUnit unit, CompletionHandler handler); @Override - public abstract void read(ByteBuffer dst, A attachment, CompletionHandler handler); + public abstract int write(ByteBuffer src) throws IOException; - public abstract void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler); - - @Override - public abstract Future write(ByteBuffer src); - - @Override public abstract void write(ByteBuffer src, A attachment, CompletionHandler handler); public final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { @@ -103,6 +127,36 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + public void setReadBuffer(ByteBuffer buffer) { + if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.setReadBuffer"); + this.readBuffer = buffer; + } + + public ByteBuffer pollReadBuffer() { + ByteBuffer rs = this.readBuffer; + if (rs != null) { + this.readBuffer = null; + return rs; + } + return bufferSupplier.get(); + } + + public void offerBuffer(ByteBuffer buffer) { + if (buffer == null) return; + bufferConsumer.accept(buffer); + } + + public void offerBuffer(ByteBuffer... buffers) { + if (buffers == null) return; + for (ByteBuffer buffer : buffers) { + bufferConsumer.accept(buffer); + } + } + + public ByteBuffer pollWriteBuffer() { + return bufferSupplier.get(); + } + public void dispose() {//同close, 只是去掉throws IOException try { this.close(); @@ -125,11 +179,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl livingCounter.decrementAndGet(); livingCounter = null; } - if (beforeCloseListener != null) + if (beforeCloseListener != null) { try { beforeCloseListener.accept(this); } catch (Exception io) { } + } + if (this.readBuffer != null) { + bufferConsumer.accept(this.readBuffer); + } if (attributes == null) return; try { for (Object obj : attributes.values()) { @@ -174,6 +232,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl /** * 创建TCP协议客户端连接 * + * @param bufferPool ByteBuffer对象池 * @param address 连接点子 * @param group 连接AsynchronousChannelGroup * @param readTimeoutSeconds 读取超时秒数 @@ -181,14 +240,31 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl * * @return 连接CompletableFuture */ - public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SocketAddress address, - final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return createTCP(group, null, address, readTimeoutSeconds, writeTimeoutSeconds); + public static CompletableFuture createTCP(final ObjectPool bufferPool, final AsynchronousChannelGroup group, + final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds); } /** * 创建TCP协议客户端连接 * + * @param context Context + * @param address 连接点子 + * @param group 连接AsynchronousChannelGroup + * @param readTimeoutSeconds 读取超时秒数 + * @param writeTimeoutSeconds 写入超时秒数 + * + * @return 连接CompletableFuture + */ + public static CompletableFuture createTCP(final Context context, final AsynchronousChannelGroup group, + final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds); + } + + /** + * 创建TCP协议客户端连接 + * + * @param bufferPool ByteBuffer对象池 * @param address 连接点子 * @param sslContext SSLContext * @param group 连接AsynchronousChannelGroup @@ -197,7 +273,25 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl * * @return 连接CompletableFuture */ - public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext, + public static CompletableFuture createTCP(final ObjectPool bufferPool, final AsynchronousChannelGroup group, final SSLContext sslContext, + final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return createTCP(bufferPool, bufferPool, group, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds); + } + + /** + * 创建TCP协议客户端连接 + * + * @param bufferSupplier ByteBuffer生产器 + * @param bufferConsumer ByteBuffer回收器 + * @param address 连接点子 + * @param sslContext SSLContext + * @param group 连接AsynchronousChannelGroup + * @param readTimeoutSeconds 读取超时秒数 + * @param writeTimeoutSeconds 写入超时秒数 + * + * @return 连接CompletableFuture + */ + public static CompletableFuture createTCP(final Supplier bufferSupplier, Consumer bufferConsumer, final AsynchronousChannelGroup group, final SSLContext sslContext, final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { final CompletableFuture future = new CompletableFuture<>(); try { @@ -211,7 +305,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl channel.connect(address, null, new CompletionHandler() { @Override public void completed(Void result, Void attachment) { - future.complete(create(channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds)); + future.complete(new TcpAioAsyncConnection(bufferSupplier, bufferConsumer, channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds, null, null)); } @Override @@ -225,80 +319,109 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return future; } - /** - * 通常用于 ssl socket - * - * @param socket Socket对象 - * - * @return 连接对象 - */ - public static AsyncConnection create(final Socket socket) { - return create(socket, null, 0, 0); - } - - public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); - } - - public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, - final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); - } - - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, +// public static AsyncConnection create(final Socket socket) { +// return create(socket, null, 0, 0); +// } +// public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { +// return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); +// } +// +// public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, +// final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { +// return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); +// } +// +// public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, +// final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { +// return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); +// } +// +// public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { +// return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); +// } +// +// public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, +// final int readTimeoutSeconds0, final int writeTimeoutSeconds0, +// final AtomicLong livingCounter, final AtomicLong closedCounter) { +// return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); +// } + public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, + SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } - public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { - return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); - } - - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, + public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, + SocketAddress addr, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, null, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } - public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SSLContext sslContext, + SocketAddress addr, final boolean client0, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } - public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + public static AsyncConnection create(final ObjectPool bufferPool, final DatagramChannel ch, SSLContext sslContext, + SocketAddress addr, final boolean client0, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } - public static AsyncConnection create(final AsynchronousSocketChannel ch) { - return create(ch, null, 0, 0); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) { + return create(context, ch, (SocketAddress) null, 0, 0); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, + final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) { - return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, - final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, - final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, - final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); + public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch) { + return create(bufferPool, ch, null, 0, 0); } + + public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + } + + public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch, SSLContext sslContext, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + } + } diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index 030b68393..a729f5a99 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -55,14 +55,13 @@ public class PrepareRunner implements Runnable { return; } if (response == null) response = responsePool.get(); - final ByteBuffer buffer = response.request.pollReadBuffer(); try { - channel.read(buffer, keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null, - new CompletionHandler() { + channel.read(keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, + new CompletionHandler() { @Override - public void completed(Integer count, Void attachment1) { + public void completed(Integer count, ByteBuffer buffer) { if (count < 1) { - response.request.offerReadBuffer(buffer); + channel.offerBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); @@ -85,8 +84,8 @@ public class PrepareRunner implements Runnable { } @Override - public void failed(Throwable exc, Void attachment2) { - response.request.offerReadBuffer(buffer); + public void failed(Throwable exc, ByteBuffer buffer) { + channel.offerBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); @@ -96,7 +95,6 @@ public class PrepareRunner implements Runnable { } }); } catch (Exception te) { - response.request.offerReadBuffer(buffer); channel.dispose();// response.init(channel); 在调用之前异常 response.removeChannel(); response.finish(true); @@ -126,19 +124,4 @@ public class PrepareRunner implements Runnable { return response.removeChannel(); } - protected ByteBuffer pollReadBuffer(Request request) { - return request.pollReadBuffer(); - } - - protected ByteBuffer pollReadBuffer(Response response) { - return response.request.pollReadBuffer(); - } - - protected void offerReadBuffer(Request request, ByteBuffer buffer) { - request.offerReadBuffer(buffer); - } - - protected void offerReadBuffer(Response response, ByteBuffer buffer) { - response.request.offerReadBuffer(buffer); - } } diff --git a/src/org/redkale/net/PrepareServlet.java b/src/org/redkale/net/PrepareServlet.java index ec5df1be5..65b991a7d 100644 --- a/src/org/redkale/net/PrepareServlet.java +++ b/src/org/redkale/net/PrepareServlet.java @@ -213,15 +213,16 @@ public abstract class PrepareServlet() { + channel.read(new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { - buffer.flip(); - ai.addAndGet(-request.readBody(buffer)); + attachment.flip(); + ai.addAndGet(-request.readBody(attachment)); if (ai.get() > 0) { - buffer.clear(); - request.channel.read(buffer, buffer, this); + attachment.clear(); + channel.setReadBuffer(attachment); + channel.read(this); } else { - if (buffer.hasRemaining()) { - request.setMoredata(buffer); + if (attachment.hasRemaining()) { + request.setMoredata(attachment); } else { - request.offerReadBuffer(buffer); + channel.offerBuffer(attachment); } request.prepare(); try { @@ -261,7 +264,7 @@ public abstract class PrepareServlet { protected AsyncConnection channel; - protected ByteBuffer readBuffer; - /** * properties 与 attributes 的区别在于:调用recycle时, attributes会被清空而properties会保留; * properties 通常存放需要永久绑定在request里的一些对象 @@ -49,7 +47,6 @@ public abstract class Request { protected Request(C context) { this.context = context; - this.readBuffer = context.pollBuffer(); this.bsonConvert = context.getBsonConvert(); this.jsonConvert = context.getJsonConvert(); } @@ -64,23 +61,6 @@ public abstract class Request { return rs; } - protected ByteBuffer pollReadBuffer() { - ByteBuffer buffer = this.readBuffer; - this.readBuffer = null; - if (buffer == null) buffer = context.pollBuffer(); - return buffer; - } - - protected void offerReadBuffer(ByteBuffer buffer) { - if (buffer == null) return; - if (this.readBuffer == null) { - buffer.clear(); - this.readBuffer = buffer; - } else { - context.offerBuffer(buffer); - } - } - /** * 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 * diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index 5c3740062..1b29c75ec 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.*; import javax.net.ssl.SSLContext; /** @@ -35,11 +36,12 @@ public class TcpAioAsyncConnection extends AsyncConnection { private BlockingQueue writeQueue; - public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, + public TcpAioAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, + final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0, + final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + super(bufferSupplier, bufferConsumer, sslContext); this.channel = ch; - this.sslContext = sslContext; this.readTimeoutSeconds = readTimeoutSeconds; this.writeTimeoutSeconds = writeTimeoutSeconds; SocketAddress addr = addr0; @@ -91,19 +93,21 @@ public class TcpAioAsyncConnection extends AsyncConnection { } @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + public void read(CompletionHandler handler) { this.readtime = System.currentTimeMillis(); + ByteBuffer dst = pollReadBuffer(); if (readTimeoutSeconds > 0) { - channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); + channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, dst, handler); } else { - channel.read(dst, attachment, handler); + channel.read(dst, dst, handler); } } @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + public void read(long timeout, TimeUnit unit, CompletionHandler handler) { this.readtime = System.currentTimeMillis(); - channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler); + ByteBuffer dst = pollReadBuffer(); + channel.read(dst, timeout < 0 ? 0 : timeout, unit, dst, handler); } private void nextWrite(A attachment) { @@ -223,13 +227,21 @@ public class TcpAioAsyncConnection extends AsyncConnection { } @Override - public final Future read(ByteBuffer dst) { - return channel.read(dst); + public final int read(ByteBuffer dst) throws IOException { + try { + return channel.read(dst).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } } @Override - public final Future write(ByteBuffer src) { - return channel.write(src); + public final int write(ByteBuffer src) throws IOException { + try { + return channel.write(src).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } } @Override diff --git a/src/org/redkale/net/TcpAioProtocolServer.java b/src/org/redkale/net/TcpAioProtocolServer.java index eb76d8351..5e964d45c 100644 --- a/src/org/redkale/net/TcpAioProtocolServer.java +++ b/src/org/redkale/net/TcpAioProtocolServer.java @@ -33,7 +33,7 @@ public class TcpAioProtocolServer extends ProtocolServer { @Override public void open(AnyValue config) throws IOException { //group = AsynchronousChannelGroup.withThreadPool(context.executor); - group = AsynchronousChannelGroup.withFixedThreadPool(context.executor.getCorePoolSize(), context.executor.getThreadFactory()); + group = AsynchronousChannelGroup.withFixedThreadPool(context.executor.getCorePoolSize(), context.executor.getThreadFactory()); this.serverChannel = AsynchronousServerSocketChannel.open(group); final Set> options = this.serverChannel.supportedOptions(); @@ -95,9 +95,8 @@ public class TcpAioProtocolServer extends ProtocolServer { } catch (IOException e) { context.logger.log(Level.INFO, channel + " setOption error", e); } - AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); - conn.livingCounter = livingCounter; - conn.closedCounter = closedCounter; + AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel, + context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); context.runAsync(new PrepareRunner(context, conn, null, null)); } diff --git a/src/org/redkale/net/TcpBioAsyncConnection.java b/src/org/redkale/net/TcpBioAsyncConnection.java deleted file mode 100644 index 1e4766f21..000000000 --- a/src/org/redkale/net/TcpBioAsyncConnection.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net; - -import java.io.IOException; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public class TcpBioAsyncConnection extends AsyncConnection { - - static final Set> defaultOptions = defaultOptions(); - - private static Set> defaultOptions() { - HashSet> set = new HashSet<>(5); - set.add(StandardSocketOptions.SO_SNDBUF); - set.add(StandardSocketOptions.SO_RCVBUF); - set.add(StandardSocketOptions.SO_KEEPALIVE); - set.add(StandardSocketOptions.SO_REUSEADDR); - set.add(StandardSocketOptions.TCP_NODELAY); - return Collections.unmodifiableSet(set); - } - - private int readTimeoutSeconds; - - private int writeTimeoutSeconds; - - private final Socket socket; - - private final ReadableByteChannel readChannel; - - private final WritableByteChannel writeChannel; - - private final SocketAddress remoteAddress; - - public TcpBioAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.socket = socket; - ReadableByteChannel rc = null; - WritableByteChannel wc = null; - try { - socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0)); - rc = Channels.newChannel(socket.getInputStream()); - wc = Channels.newChannel(socket.getOutputStream()); - } catch (IOException e) { - e.printStackTrace(); - } - this.readChannel = rc; - this.writeChannel = wc; - this.readTimeoutSeconds = readTimeoutSeconds0; - this.writeTimeoutSeconds = writeTimeoutSeconds0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = socket.getRemoteSocketAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public boolean isTCP() { - return true; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - return socket.getLocalSocketAddress(); - } - - @Override - public int getReadTimeoutSeconds() { - return readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return writeTimeoutSeconds; - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public boolean shutdownInput() { - try { - this.socket.shutdownInput(); - return true; - } catch (IOException e) { - return false; - } - } - - @Override - public boolean shutdownOutput() { - try { - this.socket.shutdownOutput(); - return true; - } catch (IOException e) { - return false; - } - } - - @Override - public boolean setOption(SocketOption name, T value) { - try { - if (StandardSocketOptions.SO_REUSEADDR == name) { - this.socket.setReuseAddress((Boolean) value); - return true; - } - if (StandardSocketOptions.SO_KEEPALIVE == name) { - this.socket.setKeepAlive((Boolean) value); - return true; - } - if (StandardSocketOptions.TCP_NODELAY == name) { - this.socket.setTcpNoDelay((Boolean) value); - return true; - } - if (StandardSocketOptions.SO_RCVBUF == name) { - this.socket.setReceiveBufferSize((Integer) value); - return true; - } - if (StandardSocketOptions.SO_SNDBUF == name) { - this.socket.setSendBufferSize((Integer) value); - return true; - } - } catch (IOException e) { - return false; - } - return false; - } - - @Override - public Set> supportedOptions() { - return defaultOptions; - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - try { - int rs = 0; - for (int i = offset; i < offset + length; i++) { - rs += writeChannel.write(srcs[i]); - } - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - try { - int rs = readChannel.read(dst); - this.readtime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); - } - - @Override - public Future read(ByteBuffer dst) { - try { - int rs = readChannel.read(dst); - this.readtime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - try { - int rs = writeChannel.write(src); - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public Future write(ByteBuffer src) { - try { - int rs = writeChannel.write(src); - this.writetime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - super.close(); - this.socket.close(); - } - - @Override - public boolean isOpen() { - return !socket.isClosed(); - } -} diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java deleted file mode 100644 index d075a8ea3..000000000 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net; - -import java.io.IOException; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.Set; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public class TcpNioAsyncConnection extends AsyncConnection { - - protected int readTimeoutSeconds; - - protected int writeTimeoutSeconds; - - protected final Selector selector; - - protected SelectionKey key; - - protected final SocketChannel channel; - - protected final SocketAddress remoteAddress; - - ByteBuffer readBuffer; - - Object readAttachment; - - CompletionHandler readHandler; - - ByteBuffer writeOneBuffer; - - ByteBuffer[] writeBuffers; - - int writingCount; - - int writeOffset; - - int writeLength; - - Object writeAttachment; - - CompletionHandler writeHandler; - - public TcpNioAsyncConnection(final SocketChannel ch, SocketAddress addr0, - final Selector selector, - final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.channel = ch; - this.selector = selector; - this.readTimeoutSeconds = readTimeoutSeconds0; - this.writeTimeoutSeconds = writeTimeoutSeconds0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public int getReadTimeoutSeconds() { - return this.readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return this.writeTimeoutSeconds; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - try { - return channel.getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - public boolean shutdownInput() { - try { - this.channel.shutdownInput(); - return true; - } catch (IOException e) { - return false; - } - } - - @Override - public boolean shutdownOutput() { - try { - this.channel.shutdownOutput(); - return true; - } catch (IOException e) { - return false; - } - } - - @Override - public boolean setOption(SocketOption name, T value) { - try { - this.channel.setOption(name, value); - return true; - } catch (IOException e) { - return false; - } - } - - @Override - public Set> supportedOptions() { - return this.channel.supportedOptions(); - } - - CompletionHandler removeReadHandler() { - CompletionHandler handler = this.readHandler; - this.readHandler = null; - return handler; - } - - ByteBuffer removeReadBuffer() { - ByteBuffer buffer = this.readBuffer; - this.readBuffer = null; - return buffer; - } - - Object removeReadAttachment() { - Object attach = this.readAttachment; - this.readAttachment = null; - return attach; - } - - void completeRead(int rs) { - Object attach = this.readAttachment; - CompletionHandler handler = this.readHandler; - this.readBuffer = null; - this.readAttachment = null; - this.readHandler = null; - handler.completed(rs, attach); - } - - void faileRead(Throwable t) { - Object attach = this.readAttachment; - CompletionHandler handler = this.readHandler; - this.readBuffer = null; - this.readAttachment = null; - this.readHandler = null; - handler.failed(t, attach); - } - - CompletionHandler removeWriteHandler() { - CompletionHandler handler = this.writeHandler; - this.writeHandler = null; - return handler; - } - - ByteBuffer removeWriteOneBuffer() { - ByteBuffer buffer = this.writeOneBuffer; - this.writeOneBuffer = null; - return buffer; - } - - ByteBuffer[] removeWriteBuffers() { - ByteBuffer[] buffers = this.writeBuffers; - this.writeBuffers = null; - return buffers; - } - - int removeWritingCount() { - int rs = this.writingCount; - this.writingCount = 0; - return rs; - } - - int removeWriteOffset() { - int rs = this.writeOffset; - this.writeOffset = 0; - return rs; - } - - int removeWriteLength() { - int rs = this.writeLength; - this.writeLength = 0; - return rs; - } - - Object removeWriteAttachment() { - Object attach = this.writeAttachment; - this.writeAttachment = null; - return attach; - } - - void completeWrite(int rs) { - Object attach = this.writeAttachment; - CompletionHandler handler = this.writeHandler; - this.writeOneBuffer = null; - this.writeBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; - this.writeAttachment = null; - this.writeHandler = null; - handler.completed(rs, attach); - } - - void faileWrite(Throwable t) { - Object attach = this.writeAttachment; - CompletionHandler handler = this.writeHandler; - this.writeOneBuffer = null; - this.writeBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; - this.writeAttachment = null; - this.writeHandler = null; - handler.failed(t, attach); - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - if (this.readHandler != null) throw new RuntimeException("pending read"); - try { - this.readBuffer = dst; - this.readAttachment = attachment; - this.readHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_READ); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_READ); - } - selector.wakeup(); - } catch (Exception e) { - faileRead(e); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); - } - - @Override - public Future read(ByteBuffer dst) { - CompletableFuture future = new CompletableFuture(); - read(dst, null, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - future.complete(result); - } - - @Override - public void failed(Throwable exc, Void attachment) { - future.completeExceptionally(exc); - } - }); - return future; - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - if (this.writeHandler != null) throw new RuntimeException("pending write"); - try { - this.writeBuffers = srcs; - this.writeOffset = offset; - this.writeLength = length; - this.writingCount = 0; - this.writeAttachment = attachment; - this.writeHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_WRITE); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_WRITE); - } - selector.wakeup(); - } catch (Exception e) { - faileWrite(e); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - if (this.writeHandler != null) throw new RuntimeException("pending write"); - try { - this.writeOneBuffer = src; - this.writingCount = 0; - this.writeAttachment = attachment; - this.writeHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_WRITE); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_WRITE); - } - selector.wakeup(); - } catch (Exception e) { - faileWrite(e); - } - } - - @Override - public Future write(ByteBuffer src) { - CompletableFuture future = new CompletableFuture(); - write(src, null, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - future.complete(result); - } - - @Override - public void failed(Throwable exc, Void attachment) { - future.completeExceptionally(exc); - } - }); - return future; - } - - @Override - public final void close() throws IOException { - super.close(); - channel.close(); - key.cancel(); - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return true; - } - -} diff --git a/src/org/redkale/net/TcpNioProtocolServer.java b/src/org/redkale/net/TcpNioProtocolServer.java deleted file mode 100644 index 05f7380a0..000000000 --- a/src/org/redkale/net/TcpNioProtocolServer.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net; - -import java.io.IOException; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; -import org.redkale.util.AnyValue; - -/** - * 协议底层Server - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public class TcpNioProtocolServer extends ProtocolServer { - - private Selector acceptSelector; - - private ServerSocketChannel serverChannel; - - private NioThreadWorker[] workers; - - private NioThreadWorker currWorker; - - private boolean running; - - public TcpNioProtocolServer(Context context) { - super(context); - } - - @Override - public void open(AnyValue config) throws IOException { - acceptSelector = Selector.open(); - this.serverChannel = ServerSocketChannel.open(); - serverChannel.configureBlocking(false); - ServerSocket socket = serverChannel.socket(); - socket.setReceiveBufferSize(16 * 1024); - socket.setReuseAddress(true); - - final Set> options = this.serverChannel.supportedOptions(); - if (options.contains(StandardSocketOptions.TCP_NODELAY)) { - this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - } - if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { - this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - } - if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { - this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - } - if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - } - if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - } - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local, backlog); - } - - @Override - public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public void accept() throws IOException { - this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); - this.running = true; - this.workers = new NioThreadWorker[Runtime.getRuntime().availableProcessors()]; - final CountDownLatch wkcdl = new CountDownLatch(workers.length); - for (int i = 0; i < workers.length; i++) { - workers[i] = new NioThreadWorker(wkcdl, i + 1, workers.length); - workers[i].setDaemon(true); - workers[i].start(); - } - for (int i = 0; i < workers.length - 1; i++) { //构成环形 - workers[i].next = workers[i + 1]; - } - workers[workers.length - 1].next = workers[0]; - currWorker = workers[0]; - try { - wkcdl.await(3, TimeUnit.SECONDS); - } catch (Exception e) { - throw new IOException(e); - } - final CountDownLatch cdl = new CountDownLatch(1); - new Thread() { - @Override - public void run() { - cdl.countDown(); - while (running) { - try { - acceptSelector.select(); - Set selectedKeys = acceptSelector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - if (key.isAcceptable()) { - try { - SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); - createCounter.incrementAndGet(); - livingCounter.incrementAndGet(); - currWorker.addChannel(channel); - currWorker = currWorker.next; - } catch (IOException io) { - io.printStackTrace(); - } - } - } - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - } - }.start(); - try { - cdl.await(3, TimeUnit.SECONDS); - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public void close() throws IOException { - if (!this.running) return; - serverChannel.close(); - acceptSelector.close(); - for (NioThreadWorker worker : workers) { - worker.interrupt(); - } - this.running = false; - } - - class NioThreadWorker extends Thread { - - final Selector selector; - - final CountDownLatch cdl; - - private final Queue connected; - - private final CopyOnWriteArrayList done; - - protected volatile Thread ownerThread; - - NioThreadWorker next; - - public NioThreadWorker(final CountDownLatch cdl, int idx, int count) { - this.cdl = cdl; - String idxstr = "000000" + idx; - this.setName("NioThreadWorker:" + context.getServerAddress().getPort() + "-" + idxstr.substring(idxstr.length() - ("" + count).length())); - try { - this.selector = Selector.open(); - } catch (IOException e) { - throw new RuntimeException(e); - } - this.connected = new ArrayBlockingQueue<>(1000000); - this.done = new CopyOnWriteArrayList<>(); - } - - public boolean addChannel(SocketChannel channel) throws IOException { - TcpNioAsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); - return connected.add(conn); - } - - protected void processConnected() { - TcpNioAsyncConnection schannel; - try { - while ((schannel = connected.poll()) != null) { - SocketChannel channel = schannel.channel; - channel.configureBlocking(false); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - channel.register(selector, SelectionKey.OP_READ).attach(schannel); - } - } catch (IOException e) { - // do nothing - } - synchronized (done) { - for (TcpNioAsyncConnection conn : done) { - if (conn.key != null && conn.key.isValid()) { - conn.key.interestOps(SelectionKey.OP_WRITE); - } - } - done.clear(); - } - } - - public boolean isSameThread() { - return this.ownerThread == Thread.currentThread(); - } - - @Override - public void run() { - this.ownerThread = Thread.currentThread(); - if (cdl != null) cdl.countDown(); - while (running) { - processConnected(); - try { - selector.select(50); - } catch (IOException e) { - e.printStackTrace(); - } - try { - Set selectedKeys = selector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - processKey(key); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private void processKey(SelectionKey key) { - if (key == null || !key.isValid()) return; - SocketChannel socket = (SocketChannel) key.channel(); - TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); - if (!socket.isOpen()) { - if (conn == null) { - key.cancel(); - } else { - conn.dispose(); - } - return; - } - if (conn == null) return; - if (key.isReadable()) { - if (conn.readHandler != null) readOP(key, socket, conn); - } else if (key.isWritable()) { - if (conn.writeHandler != null) writeOP(key, socket, conn); - } - } - - private void closeOP(SelectionKey key) { - if (key == null) return; - TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); - try { - if (key.isValid()) { - SocketChannel socketChannel = (SocketChannel) key.channel(); - socketChannel.close(); - key.attach(null); - key.cancel(); - } - } catch (IOException e) { - } - conn.dispose(); - } - - private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { - final CompletionHandler handler = conn.removeReadHandler(); - final ByteBuffer buffer = conn.removeReadBuffer(); - final Object attach = conn.removeReadAttachment(); - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler); - if (handler == null || buffer == null) return; - try { - final int rs = socket.read(buffer); - { //测试 - buffer.flip(); - byte[] bs = new byte[buffer.remaining()]; - buffer.get(bs); - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs)); - } - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs); - context.runAsync(() -> { - try { - handler.completed(rs, attach); - } catch (Throwable e) { - handler.failed(e, attach); - } - }); - } catch (Throwable t) { - context.runAsync(() -> handler.failed(t, attach)); - } - } - - private void writeOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { - final CompletionHandler handler = conn.writeHandler; - final ByteBuffer oneBuffer = conn.removeWriteOneBuffer(); - final ByteBuffer[] buffers = conn.removeWriteBuffers(); - final Object attach = conn.removeWriteAttachment(); - final int writingCount = conn.removeWritingCount(); - final int writeOffset = conn.removeWriteOffset(); - final int writeLength = conn.removeWriteLength(); - if (handler == null || (oneBuffer == null && buffers == null)) return; - //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler); - try { - int rs = 0; - if (oneBuffer == null) { - int offset = writeOffset; - int length = writeLength; - rs = (int) socket.write(buffers, offset, length); - boolean over = true; - int end = offset + length; - for (int i = offset; i < end; i++) { - if (buffers[i].hasRemaining()) { - over = false; - length -= i - offset; - offset = i; - } - } - if (!over) { - conn.writingCount += rs; - conn.writeHandler = handler; - conn.writeAttachment = attach; - conn.writeBuffers = buffers; - conn.writeOffset = offset; - conn.writeLength = length; - key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); - key.selector().wakeup(); - return; - } - } else { - rs = socket.write(oneBuffer); - if (oneBuffer.hasRemaining()) { - conn.writingCount += rs; - conn.writeHandler = handler; - conn.writeAttachment = attach; - conn.writeOneBuffer = oneBuffer; - key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); - key.selector().wakeup(); - return; - } - } - conn.removeWriteHandler(); - key.interestOps(SelectionKey.OP_READ); //OP_CONNECT - final int rs0 = rs + writingCount; - //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs); - context.runAsync(() -> { - try { - handler.completed(rs0, attach); - } catch (Throwable e) { - handler.failed(e, attach); - } - }); - } catch (Throwable t) { - context.runAsync(() -> handler.failed(t, attach)); - } - } - - } -} diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 893758805..21282366f 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -29,41 +29,41 @@ import org.redkale.util.*; * @author zhangjx */ public final class Transport { - + public static final String DEFAULT_PROTOCOL = "TCP"; - + protected final AtomicInteger seq = new AtomicInteger(-1); - + protected final TransportFactory factory; - + protected final String name; //即的name属性 protected final String subprotocol; //即的subprotocol属性 protected final boolean tcp; - + protected final String protocol; - + protected final AsynchronousChannelGroup group; - + protected final InetSocketAddress clientAddress; //不可能为null protected TransportNode[] transportNodes = new TransportNode[0]; - + protected final ObjectPool bufferPool; - + protected final SSLContext sslContext; //负载均衡策略 protected final TransportStrategy strategy; - + protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress, final Collection addresses, final TransportStrategy strategy) { this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy); } - + protected Transport(String name, String protocol, String subprotocol, final TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress, @@ -81,7 +81,7 @@ public final class Transport { this.strategy = strategy; updateRemoteAddresses(addresses); } - + public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { final TransportNode[] oldNodes = this.transportNodes; synchronized (this) { @@ -109,7 +109,7 @@ public final class Transport { } return rs; } - + public final boolean addRemoteAddresses(final InetSocketAddress addr) { if (addr == null) return false; if (clientAddress != null && clientAddress.equals(addr)) return false; @@ -125,7 +125,7 @@ public final class Transport { return true; } } - + public final boolean removeRemoteAddresses(InetSocketAddress addr) { if (addr == null) return false; synchronized (this) { @@ -133,15 +133,15 @@ public final class Transport { } return true; } - + public String getName() { return name; } - + public String getSubprotocol() { return subprotocol; } - + public void close() { TransportNode[] nodes = this.transportNodes; if (nodes == null) return; @@ -149,22 +149,22 @@ public final class Transport { if (node != null) node.dispose(); } } - + public InetSocketAddress getClientAddress() { return clientAddress; } - + public TransportNode[] getTransportNodes() { return transportNodes; } - + public TransportNode findTransportNode(SocketAddress addr) { for (TransportNode node : this.transportNodes) { if (node.address.equals(addr)) return node; } return null; } - + public InetSocketAddress[] getRemoteAddresses() { InetSocketAddress[] rs = new InetSocketAddress[transportNodes.length]; for (int i = 0; i < rs.length; i++) { @@ -172,36 +172,36 @@ public final class Transport { } return rs; } - + @Override public String toString() { return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteNodes = " + Arrays.toString(transportNodes) + "}"; } - + public ByteBuffer pollBuffer() { return bufferPool.get(); } - + public Supplier getBufferSupplier() { return bufferPool; } - + public void offerBuffer(ByteBuffer buffer) { bufferPool.accept(buffer); } - + public void offerBuffer(ByteBuffer... buffers) { for (ByteBuffer buffer : buffers) offerBuffer(buffer); } - + public AsynchronousChannelGroup getTransportChannelGroup() { return group; } - + public boolean isTCP() { return tcp; } - + public CompletableFuture pollConnection(SocketAddress addr0) { if (this.strategy != null) return strategy.pollConnection(addr0, this); final TransportNode[] nodes = this.transportNodes; @@ -215,12 +215,12 @@ public final class Transport { DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(true); channel.connect(udpaddr); - return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSeconds, factory.writeTimeoutSeconds)); + return CompletableFuture.completedFuture(AsyncConnection.create(bufferPool, channel, sslContext, udpaddr, true, factory.readTimeoutSeconds, factory.writeTimeoutSeconds)); } if (!rand) { //指定地址 TransportNode node = findTransportNode(addr); if (node == null) { - return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); } final BlockingQueue queue = node.conns; if (!queue.isEmpty()) { @@ -233,7 +233,7 @@ public final class Transport { } } } - return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); } //---------------------随机取地址------------------------ @@ -266,14 +266,14 @@ public final class Transport { @Override public void completed(Void result, TransportNode attachment) { attachment.disabletime = 0; - AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); if (future.isDone()) { if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); } else { future.complete(asyncConn); } } - + @Override public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; @@ -289,7 +289,7 @@ public final class Transport { future.complete(r); } }); - + } catch (Exception e) { future.completeExceptionally(e); } @@ -302,7 +302,7 @@ public final class Transport { throw new RuntimeException("transport address = " + addr, ex); } } - + private CompletableFuture pollConnection0(TransportNode[] nodes, TransportNode exclude, long now) throws IOException { //从可用/不可用的地址列表中创建连接 AtomicInteger count = new AtomicInteger(nodes.length); @@ -319,17 +319,17 @@ public final class Transport { public void completed(Void result, TransportNode attachment) { try { attachment.disabletime = 0; - AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); if (future.isDone()) { if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); } else { future.complete(asyncConn); } } catch (Exception e) { - failed(e, attachment); + failed(e, attachment); } } - + @Override public void failed(Throwable exc, TransportNode attachment) { attachment.disabletime = now; @@ -345,7 +345,7 @@ public final class Transport { } return future; } - + public void offerConnection(final boolean forceClose, AsyncConnection conn) { if (this.strategy != null && strategy.offerConnection(forceClose, conn)) return; if (!forceClose && conn.isTCP()) { @@ -359,7 +359,7 @@ public final class Transport { conn.dispose(); } } - + public void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler handler) { pollConnection(addr).whenComplete((conn, ex) -> { if (ex != null) { @@ -367,118 +367,119 @@ public final class Transport { return; } conn.write(buffer, buffer, new CompletionHandler() { - + @Override public void completed(Integer result, ByteBuffer attachment) { buffer.clear(); - conn.read(buffer, buffer, new CompletionHandler() { - + conn.setReadBuffer(buffer); + conn.read(new CompletionHandler() { + @Override public void completed(Integer result, ByteBuffer attachment) { if (handler != null) handler.completed(result, att); - offerBuffer(buffer); + conn.offerBuffer(attachment); offerConnection(false, conn); } - + @Override public void failed(Throwable exc, ByteBuffer attachment) { - offerBuffer(buffer); + conn.offerBuffer(attachment); offerConnection(true, conn); } }); - + } - + @Override public void failed(Throwable exc, ByteBuffer attachment) { - offerBuffer(buffer); + conn.offerBuffer(attachment); offerConnection(true, conn); } }); }); } - + public static class TransportNode { - + protected InetSocketAddress address; - + protected volatile long disabletime; //不可用时的时间, 为0表示可用 protected final BlockingQueue conns; - + protected final ConcurrentHashMap attributes = new ConcurrentHashMap<>(); - + public TransportNode(int poolmaxconns, InetSocketAddress address) { this.address = address; this.disabletime = 0; this.conns = new ArrayBlockingQueue<>(poolmaxconns); } - + @ConstructorParameters({"poolmaxconns", "address", "disabletime"}) public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) { this.address = address; this.disabletime = disabletime; this.conns = new LinkedBlockingQueue<>(poolmaxconns); } - + public int getPoolmaxconns() { return this.conns.remainingCapacity() + this.conns.size(); } - + public T setAttribute(String name, T value) { attributes.put(name, value); return value; } - + @SuppressWarnings("unchecked") public T getAttribute(String name) { return (T) attributes.get(name); } - + @SuppressWarnings("unchecked") public T removeAttribute(String name) { return (T) attributes.remove(name); } - + public TransportNode clearAttributes() { attributes.clear(); return this; } - + public ConcurrentHashMap getAttributes() { return attributes; } - + public void setAttributes(ConcurrentHashMap map) { attributes.clear(); if (map != null) attributes.putAll(map); } - + public InetSocketAddress getAddress() { return address; } - + public long getDisabletime() { return disabletime; } - + @ConvertDisabled public BlockingQueue getConns() { return conns; } - + public void dispose() { AsyncConnection conn; while ((conn = conns.poll()) != null) { conn.dispose(); } } - + @Override public int hashCode() { return this.address.hashCode(); } - + @Override public boolean equals(Object obj) { if (this == obj) return true; @@ -487,7 +488,7 @@ public final class Transport { final TransportNode other = (TransportNode) obj; return this.address.equals(other.address); } - + @Override public String toString() { return JsonConvert.root().convertTo(this); diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 650b72420..a334e70bc 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -393,33 +393,34 @@ public class TransportFactory { final BlockingQueue localqueue = queue; localconn.write(sendBuffer, sendBuffer, new CompletionHandler() { @Override - public void completed(Integer result, ByteBuffer buffer) { - if (buffer.hasRemaining()) { - localconn.write(buffer, buffer, this); + public void completed(Integer result, ByteBuffer wbuffer) { + if (wbuffer.hasRemaining()) { + localconn.write(wbuffer, wbuffer, this); return; } - ByteBuffer pongBuffer = bufferPool.get(); - localconn.read(pongBuffer, pongBuffer, new CompletionHandler() { + localconn.read(new CompletionHandler() { int counter = 0; @Override - public void completed(Integer result, ByteBuffer attachment) { + public void completed(Integer result, ByteBuffer pongBuffer) { if (counter > 3) { - bufferPool.accept(attachment); + localconn.offerBuffer(pongBuffer); localconn.dispose(); return; } - if (pongLength > 0 && attachment.position() < pongLength) { + if (pongLength > 0 && pongBuffer.position() < pongLength) { counter++; - localconn.read(pongBuffer, pongBuffer, this); + localconn.setReadBuffer(pongBuffer); + localconn.read(this); return; } - bufferPool.accept(attachment); + localconn.offerBuffer(pongBuffer); localqueue.offer(localconn); } @Override - public void failed(Throwable exc, ByteBuffer attachment) { + public void failed(Throwable exc, ByteBuffer pongBuffer) { + localconn.offerBuffer(pongBuffer); localconn.dispose(); } }); diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java index 54801fe98..552e1afe7 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -12,6 +12,8 @@ import java.nio.channels.*; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.*; +import javax.net.ssl.SSLContext; /** * @@ -32,9 +34,11 @@ public class UdpBioAsyncConnection extends AsyncConnection { private final boolean client; - public UdpBioAsyncConnection(final DatagramChannel ch, SocketAddress addr0, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + public UdpBioAsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, + final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { + super(bufferSupplier, bufferConsumer, sslContext); this.channel = ch; this.client = client0; this.readTimeoutSeconds = readTimeoutSeconds0; @@ -127,30 +131,27 @@ public class UdpBioAsyncConnection extends AsyncConnection { } @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + public void read(CompletionHandler handler) { + ByteBuffer dst = pollReadBuffer(); try { int rs = channel.read(dst); this.readtime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); + if (handler != null) handler.completed(rs, dst); } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); + if (handler != null) handler.failed(e, dst); } } @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); + public void read(long timeout, TimeUnit unit, CompletionHandler handler) { + read(handler); } @Override - public Future read(ByteBuffer dst) { - try { - int rs = channel.read(dst); - this.readtime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } + public int read(ByteBuffer dst) throws IOException { + int rs = channel.read(dst); + this.readtime = System.currentTimeMillis(); + return rs; } @Override @@ -165,14 +166,10 @@ public class UdpBioAsyncConnection extends AsyncConnection { } @Override - public Future write(ByteBuffer src) { - try { - int rs = channel.send(src, remoteAddress); - this.writetime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } + public int write(ByteBuffer src) throws IOException { + int rs = channel.send(src, remoteAddress); + this.writetime = System.currentTimeMillis(); + return rs; } @Override diff --git a/src/org/redkale/net/UdpBioProtocolServer.java b/src/org/redkale/net/UdpBioProtocolServer.java index 13a45c05c..9e367d63b 100644 --- a/src/org/redkale/net/UdpBioProtocolServer.java +++ b/src/org/redkale/net/UdpBioProtocolServer.java @@ -85,7 +85,8 @@ public class UdpBioProtocolServer extends ProtocolServer { try { SocketAddress address = serchannel.receive(buffer); buffer.flip(); - AsyncConnection conn = new UdpBioAsyncConnection(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); + AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel, + context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); context.runAsync(new PrepareRunner(context, conn, buffer, null)); } catch (Exception e) { context.offerBuffer(buffer); diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index ae773a81f..fa748e466 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -34,8 +34,6 @@ class WebSocketRunner implements Runnable { protected final HttpContext context; - private ByteBuffer readBuffer; - volatile boolean closed = false; private final BiConsumer restMessageConsumer; //主要供RestWebSocket使用 @@ -50,7 +48,6 @@ class WebSocketRunner implements Runnable { this.webSocket = webSocket; this.restMessageConsumer = messageConsumer; this.channel = channel; - this.readBuffer = context.pollBuffer(); } @Override @@ -61,7 +58,7 @@ class WebSocketRunner implements Runnable { channel.setReadTimeoutSeconds(300); //读取超时5分钟 if (channel.isOpen()) { final int wsmaxbody = webSocket._engine.wsmaxbody; - channel.read(readBuffer, null, new CompletionHandler() { + channel.read(new CompletionHandler() { //尚未解析完的数据包 private WebSocketPacket unfinishPacket; @@ -72,31 +69,27 @@ class WebSocketRunner implements Runnable { private final SimpleEntry halfBytes = new SimpleEntry("", null); @Override - public void completed(Integer count, Void attachment1) { + public void completed(Integer count, ByteBuffer readBuffer) { if (count < 1) { if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); closeRunner(CLOSECODE_ILLPACKET, "read buffer count is " + count); return; } try { - ByteBuffer readBuf = readBuffer; - if (readBuf == null) return; //关闭后readBuffer为null lastReadTime = System.currentTimeMillis(); - readBuf.flip(); + readBuffer.flip(); WebSocketPacket onePacket = null; if (unfinishPacket != null) { - if (unfinishPacket.receiveBody(webSocket, readBuf)) { //已经接收完毕 + if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 onePacket = unfinishPacket; unfinishPacket = null; for (ByteBuffer b : exBuffers) { context.offerBuffer(b); } exBuffers.clear(); - } else { //需要继续接收 - readBuf = context.pollBuffer(); - readBuffer = readBuf; - channel.read(readBuf, null, this); + } else { //需要继续接收, 此处不能回收readBuffer + channel.read(this); return; } } @@ -105,37 +98,36 @@ class WebSocketRunner implements Runnable { if (onePacket != null) packets.add(onePacket); try { while (true) { - WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuf); + WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet != null && !packet.isReceiveFinished()) { unfinishPacket = packet; - if (readBuf.hasRemaining()) { - exBuffers.add(readBuf); - readBuf = context.pollBuffer(); - readBuffer = readBuf; + if (readBuffer.hasRemaining()) { + exBuffers.add(readBuffer); } break; } packets.add(packet); - if (packet == null || !readBuf.hasRemaining()) break; + if (packet == null || !readBuffer.hasRemaining()) break; } } catch (Exception e) { context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e); webSocket.onOccurException(e, null); } //继续监听消息 - readBuf.clear(); + readBuffer.clear(); if (halfBytes.getValue() != null) { - readBuf.put(halfBytes.getValue()); + readBuffer.put(halfBytes.getValue()); halfBytes.setValue(null); } - channel.read(readBuf, null, this); + channel.setReadBuffer(readBuffer); + channel.read(this); //消息处理 for (final WebSocketPacket packet : packets) { if (packet == null) { if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); - failed(null, attachment1); + failed(null, readBuffer); return; } @@ -197,7 +189,7 @@ class WebSocketRunner implements Runnable { } @Override - public void failed(Throwable exc, Void attachment2) { + public void failed(Throwable exc, ByteBuffer attachment2) { if (exc != null) { if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc); closeRunner(CLOSECODE_WSEXCEPTION, "read websocket-packet failed"); @@ -302,8 +294,6 @@ class WebSocketRunner implements Runnable { if (closed) return null; closed = true; channel.dispose(); - context.offerBuffer(readBuffer); - readBuffer = null; CompletableFuture future = engine.removeThenClose(webSocket); webSocket.onClose(code, reason); return future; diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 5681b0ff7..d1fd7c6e4 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -370,7 +370,6 @@ public final class SncpClient { final ByteBuffer[] sendBuffers = writer.toBuffers(); fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength); - final ByteBuffer buffer = transport.pollBuffer(); conn.write(sendBuffers, sendBuffers, new CompletionHandler() { @Override @@ -393,25 +392,25 @@ public final class SncpClient { conn.write(newattachs, newattachs, this); return; } - //----------------------- 读取返回结果 ------------------------------------- - buffer.clear(); - conn.read(buffer, null, new CompletionHandler() { + //----------------------- 读取返回结果 ------------------------------------- + conn.read(new CompletionHandler() { private byte[] body; private int received; @Override - public void completed(Integer count, Void attachment2) { + public void completed(Integer count, ByteBuffer buffer) { try { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 future.completeExceptionally(new RpcRemoteException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); - transport.offerBuffer(buffer); + conn.offerBuffer(buffer); transport.offerConnection(true, conn); return; } if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 - conn.read(buffer, attachment2, this); + conn.setReadBuffer(buffer); + conn.read(this); return; } buffer.flip(); @@ -421,8 +420,10 @@ public final class SncpClient { buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 buffer.clear(); - conn.read(buffer, attachment2, this); + conn.setReadBuffer(buffer); + conn.read(this); } else { + conn.offerBuffer(buffer); success(); } return; @@ -441,10 +442,12 @@ public final class SncpClient { this.received = buffer.remaining(); buffer.get(body, 0, this.received); buffer.clear(); - conn.read(buffer, attachment2, this); + conn.setReadBuffer(buffer); + conn.read(this); } else { this.body = new byte[respBodyLength]; buffer.get(body, 0, respBodyLength); + conn.offerBuffer(buffer); success(); } } catch (Throwable e) { @@ -461,7 +464,6 @@ public final class SncpClient { @SuppressWarnings("unchecked") public void success() { future.complete(this.body); - transport.offerBuffer(buffer); transport.offerConnection(false, conn); if (handler != null) { final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; @@ -484,9 +486,9 @@ public final class SncpClient { } @Override - public void failed(Throwable exc, Void attachment2) { + public void failed(Throwable exc, ByteBuffer attachment2) { future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); - transport.offerBuffer(buffer); + conn.offerBuffer(attachment2); transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; @@ -500,7 +502,6 @@ public final class SncpClient { @Override public void failed(Throwable exc, ByteBuffer[] attachment) { future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); - transport.offerBuffer(buffer); transport.offerConnection(true, conn); if (handler != null) { final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index c3850b3fd..3d5be275d 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -133,7 +133,7 @@ public abstract class PoolTcpSource extends PoolSource { }); } - return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { + return AsyncConnection.createTCP(bufferPool, group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { conn.beforeCloseListener((c) -> { semaphore.release(); closeCounter.incrementAndGet(); @@ -143,12 +143,11 @@ public abstract class PoolTcpSource extends PoolSource { final ByteBuffer buffer = reqConnectBuffer(conn); if (buffer == null) { - final ByteBuffer rbuffer = bufferPool.get(); - conn.read(rbuffer, null, new CompletionHandler() { + conn.read(new CompletionHandler() { @Override - public void completed(Integer result, Void attachment2) { + public void completed(Integer result, ByteBuffer rbuffer) { if (result < 0) { - failed(new SQLException("Read Buffer Error"), attachment2); + failed(new SQLException("Read Buffer Error"), rbuffer); return; } rbuffer.flip(); @@ -156,8 +155,8 @@ public abstract class PoolTcpSource extends PoolSource { } @Override - public void failed(Throwable exc, Void attachment2) { - bufferPool.accept(rbuffer); + public void failed(Throwable exc, ByteBuffer rbuffer) { + conn.offerBuffer(rbuffer); future.completeExceptionally(exc); conn.dispose(); } @@ -175,11 +174,12 @@ public abstract class PoolTcpSource extends PoolSource { return; } buffer.clear(); - conn.read(buffer, null, new CompletionHandler() { + conn.setReadBuffer(buffer); + conn.read(new CompletionHandler() { @Override - public void completed(Integer result, Void attachment2) { + public void completed(Integer result, ByteBuffer rbuffer) { if (result < 0) { - failed(new SQLException("Read Buffer Error"), attachment2); + failed(new SQLException("Read Buffer Error"), rbuffer); return; } buffer.flip(); @@ -187,8 +187,8 @@ public abstract class PoolTcpSource extends PoolSource { } @Override - public void failed(Throwable exc, Void attachment2) { - bufferPool.accept(buffer); + public void failed(Throwable exc, ByteBuffer rbuffer) { + conn.offerBuffer(rbuffer); future.completeExceptionally(exc); conn.dispose(); }