From 99ae4ccaddfcc7766fc1cb278b562a8fc1c72965 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 20 Jun 2019 15:26:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=8EContext=E4=B8=AD=E7=A7=BB=E9=99=A4Buff?= =?UTF-8?q?erPool=E5=92=8CResponsePool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/AsyncConnection.java | 57 +++---------------- src/org/redkale/net/Context.java | 57 ++----------------- src/org/redkale/net/PrepareRunner.java | 8 ++- src/org/redkale/net/ProtocolServer.java | 2 +- src/org/redkale/net/Request.java | 11 ++-- src/org/redkale/net/Response.java | 57 +++++++++++-------- src/org/redkale/net/Server.java | 17 ++++-- src/org/redkale/net/TcpAioProtocolServer.java | 17 ++++-- src/org/redkale/net/UdpBioProtocolServer.java | 20 +++++-- src/org/redkale/net/http/HttpContext.java | 7 +-- src/org/redkale/net/http/HttpRequest.java | 6 +- src/org/redkale/net/http/HttpResponse.java | 11 +++- src/org/redkale/net/http/HttpServer.java | 50 +++++++++------- src/org/redkale/net/http/WebSocket.java | 20 +++++-- src/org/redkale/net/http/WebSocketEngine.java | 47 +++++++++++++-- src/org/redkale/net/http/WebSocketPacket.java | 2 +- src/org/redkale/net/http/WebSocketRunner.java | 34 +++++------ .../redkale/net/http/WebSocketServlet.java | 14 +++-- src/org/redkale/net/sncp/SncpDynServlet.java | 2 +- src/org/redkale/net/sncp/SncpRequest.java | 8 ++- src/org/redkale/net/sncp/SncpResponse.java | 6 +- src/org/redkale/net/sncp/SncpServer.java | 46 ++++++++------- 22 files changed, 263 insertions(+), 236 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 524e67879..331f83cc7 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -52,10 +52,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy //关联的事件数, 小于1表示没有事件 protected final AtomicInteger eventing = new AtomicInteger(); - protected AsyncConnection(Context context) { - this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext()); - } - protected AsyncConnection(ObjectPool bufferPool, SSLContext sslContext) { this(bufferPool, bufferPool, sslContext); } @@ -68,6 +64,14 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy this.sslContext = sslContext; } + public Supplier getBufferSupplier() { + return this.bufferSupplier; + } + + public Consumer getBufferConsumer() { + return this.bufferConsumer; + } + public final long getLastReadTime() { return readtime; } @@ -245,22 +249,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds); } - /** - * 创建TCP协议客户端连接 - * - * @param context Context - * @param address 连接点子 - * @param group 连接AsynchronousChannelGroup - * @param readTimeoutSeconds 读取超时秒数 - * @param writeTimeoutSeconds 写入超时秒数 - * - * @return 连接CompletableFuture - */ - public static CompletableFuture createTCP(final Context context, final AsynchronousChannelGroup group, - final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds); - } - /** * 创建TCP协议客户端连接 * @@ -371,35 +359,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) { - return create(context, ch, (SocketAddress) null, 0, 0); - } - - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, - final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); - } - - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); - } - - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); - } - - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); - } - - public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); - } - public static AsyncConnection create(final ObjectPool bufferPool, final AsynchronousSocketChannel ch) { return create(bufferPool, ch, null, 0, 0); } diff --git a/src/org/redkale/net/Context.java b/src/org/redkale/net/Context.java index 18bf7abe9..486d07748 100644 --- a/src/org/redkale/net/Context.java +++ b/src/org/redkale/net/Context.java @@ -6,11 +6,8 @@ package org.redkale.net; import java.net.*; -import java.nio.*; import java.nio.charset.*; -import java.util.Collection; import java.util.concurrent.*; -import java.util.function.*; import java.util.logging.*; import javax.net.ssl.SSLContext; import org.redkale.convert.bson.*; @@ -39,12 +36,6 @@ public class Context { //ByteBuffer的容量,默认8K protected final int bufferCapacity; - //ByteBuffer对象池 - protected final ObjectPool bufferPool; - - //Response对象池 - protected final ObjectPool responsePool; - //服务的根Servlet protected final PrepareServlet prepare; @@ -83,22 +74,18 @@ public class Context { public Context(ContextConfig config) { this(config.serverStartTime, config.logger, config.executor, config.sslContext, - config.bufferCapacity, config.bufferPool, config.responsePool, config.maxconns, config.maxbody, - config.charset, config.address, config.resourceFactory, config.prepare, - config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds); + config.bufferCapacity, config.maxconns, config.maxbody, config.charset, config.address, config.resourceFactory, + config.prepare, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds); } public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext, - int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, final int maxconns, - final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, - final PrepareServlet prepare, final int aliveTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + int bufferCapacity, final int maxconns, final int maxbody, Charset charset, InetSocketAddress address, + ResourceFactory resourceFactory, PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) { this.serverStartTime = serverStartTime; this.logger = logger; this.executor = executor; this.sslContext = sslContext; this.bufferCapacity = bufferCapacity; - this.bufferPool = bufferPool; - this.responsePool = responsePool; this.maxconns = maxconns; this.maxbody = maxbody; this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset; @@ -160,36 +147,6 @@ public class Context { return bufferCapacity; } - public Supplier getBufferSupplier() { - return bufferPool; - } - - public Consumer getBufferConsumer() { - return bufferPool; - } - - public ByteBuffer pollBuffer() { - return bufferPool.get(); - } - - public void offerBuffer(ByteBuffer buffer) { - bufferPool.accept(buffer); - } - - public void offerBuffer(ByteBuffer... buffers) { - if (buffers == null) return; - for (ByteBuffer buffer : buffers) { - bufferPool.accept(buffer); - } - } - - public void offerBuffer(Collection buffers) { - if (buffers == null) return; - for (ByteBuffer buffer : buffers) { - bufferPool.accept(buffer); - } - } - public Logger getLogger() { return logger; } @@ -228,12 +185,6 @@ public class Context { //ByteBuffer的容量,默认8K public int bufferCapacity; - //ByteBuffer对象池 - public ObjectPool bufferPool; - - //Response对象池 - public ObjectPool responsePool; - //服务的根Servlet public PrepareServlet prepare; diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index 4422d612a..900f1d3ca 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -28,12 +28,15 @@ public class PrepareRunner implements Runnable { private final Context context; + private final ObjectPool responsePool; + private ByteBuffer data; private Response response; - public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) { + public PrepareRunner(Context context, ObjectPool responsePool, AsyncConnection channel, ByteBuffer data, Response response) { this.context = context; + this.responsePool = responsePool; this.channel = channel; this.data = data; this.response = response; @@ -42,7 +45,6 @@ public class PrepareRunner implements Runnable { @Override public void run() { final boolean keepalive = response != null; - final ObjectPool responsePool = context.responsePool; if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (response == null) response = responsePool.get(); try { @@ -165,7 +167,7 @@ public class PrepareRunner implements Runnable { } protected Response pollResponse() { - return context.responsePool.get(); + return responsePool.get(); } protected Request pollRequest(Response response) { diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index 0879b4e3a..4bdd071a7 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -43,7 +43,7 @@ public abstract class ProtocolServer { public abstract void setOption(SocketOption name, T value) throws IOException; - public abstract void accept() throws IOException; + public abstract void accept(Server server) throws IOException; public abstract void close() throws IOException; diff --git a/src/org/redkale/net/Request.java b/src/org/redkale/net/Request.java index 3f9ce8a23..7767a5622 100644 --- a/src/org/redkale/net/Request.java +++ b/src/org/redkale/net/Request.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.json.JsonConvert; +import org.redkale.util.ObjectPool; /** * 协议请求对象 @@ -23,6 +24,8 @@ public abstract class Request { protected final C context; + protected final ObjectPool bufferPool; + protected final BsonConvert bsonConvert; protected final JsonConvert jsonConvert; @@ -47,9 +50,9 @@ public abstract class Request { protected final Map attributes = new HashMap<>(); - protected Request(C context) { + protected Request(C context, ObjectPool bufferPool) { this.context = context; - this.readBuffer = context.pollBuffer(); + this.bufferPool = bufferPool; this.bsonConvert = context.getBsonConvert(); this.jsonConvert = context.getJsonConvert(); } @@ -67,7 +70,7 @@ public abstract class Request { protected ByteBuffer pollReadBuffer() { ByteBuffer buffer = this.readBuffer; this.readBuffer = null; - if (buffer == null) buffer = context.pollBuffer(); + if (buffer == null) buffer = bufferPool.get(); return buffer; } @@ -77,7 +80,7 @@ public abstract class Request { buffer.clear(); this.readBuffer = buffer; } else { - context.offerBuffer(buffer); + bufferPool.accept(buffer); } } diff --git a/src/org/redkale/net/Response.java b/src/org/redkale/net/Response.java index ab8763883..6f92cee4e 100644 --- a/src/org/redkale/net/Response.java +++ b/src/org/redkale/net/Response.java @@ -10,6 +10,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.function.*; import java.util.logging.Level; +import org.redkale.util.ObjectPool; /** * 协议响应对象 @@ -26,6 +27,10 @@ public abstract class Response> { protected final C context; + protected final ObjectPool bufferPool; + + protected final ObjectPool responsePool; + protected final R request; protected AsyncConnection channel; @@ -66,15 +71,15 @@ public abstract class Response> { private void offerResponseBuffer(ByteBuffer attachment) { if (writeHeadBuffer == null) { - if (context.bufferPool.getRecyclerPredicate().test(attachment)) { + if (bufferPool.getRecyclerPredicate().test(attachment)) { writeHeadBuffer = attachment; } } else if (writeBodyBuffer == null) { - if (context.bufferPool.getRecyclerPredicate().test(attachment)) { + if (bufferPool.getRecyclerPredicate().test(attachment)) { writeBodyBuffer = attachment; } } else { - context.offerBuffer(attachment); + bufferPool.accept(attachment); } } @@ -108,31 +113,33 @@ public abstract class Response> { private void offerResponseBuffer(ByteBuffer[] attachments) { int start = 0; if (writeHeadBuffer == null && attachments.length > start) { - if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { + if (bufferPool.getRecyclerPredicate().test(attachments[start])) { writeHeadBuffer = attachments[start]; start++; } } if (writeBodyBuffer == null && attachments.length > start) { - if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { + if (bufferPool.getRecyclerPredicate().test(attachments[start])) { writeBodyBuffer = attachments[start]; start++; } } for (int i = start; i < attachments.length; i++) { - context.offerBuffer(attachments[i]); + bufferPool.accept(attachments[i]); } } }; - protected Response(C context, final R request) { + protected Response(C context, final R request, ObjectPool responsePool) { this.context = context; this.request = request; - this.writeHeadBuffer = context.pollBuffer(); - this.writeBodyBuffer = context.pollBuffer(); + this.bufferPool = request.bufferPool; + this.responsePool = responsePool; + this.writeHeadBuffer = bufferPool.get(); + this.writeBodyBuffer = bufferPool.get(); this.bodyBufferSupplier = () -> { ByteBuffer buffer = writeBodyBuffer; - if (buffer == null) return context.pollBuffer(); + if (buffer == null) return bufferPool.get(); writeBodyBuffer = null; return buffer; }; @@ -141,14 +148,14 @@ public abstract class Response> { protected ByteBuffer pollWriteReadBuffer() { ByteBuffer buffer = this.writeHeadBuffer; this.writeHeadBuffer = null; - if (buffer == null) buffer = context.pollBuffer(); + if (buffer == null) buffer = bufferPool.get(); return buffer; } protected ByteBuffer pollWriteBodyBuffer() { ByteBuffer buffer = this.writeBodyBuffer; this.writeBodyBuffer = null; - if (buffer == null) buffer = context.pollBuffer(); + if (buffer == null) buffer = bufferPool.get(); return buffer; } @@ -157,7 +164,9 @@ public abstract class Response> { } protected void offerBuffer(ByteBuffer... buffers) { - context.offerBuffer(buffers); + for (ByteBuffer buffer : buffers) { + bufferPool.accept(buffer); + } } protected AsyncConnection removeChannel() { @@ -257,19 +266,19 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); this.recycle(); this.prepare(); - new PrepareRunner(context, conn, null, this).run(); + new PrepareRunner(context, this.responsePool, conn, null, this).run(); } else { channel.dispose(); } } else { - this.context.responsePool.accept(this); + this.responsePool.accept(this); } } public void finish(final byte[] bs) { if (!this.inited) return; //避免重复关闭 if (this.context.bufferCapacity == bs.length) { - ByteBuffer buffer = this.context.pollBuffer(); + ByteBuffer buffer = this.bufferPool.get(); buffer.put(bs); buffer.flip(); this.finish(buffer); @@ -285,7 +294,7 @@ public abstract class Response> { final boolean more = data != null && this.request.keepAlive; this.request.more = more; conn.write(buffer, buffer, finishHandler); - if (more) new PrepareRunner(this.context, conn, data, null).run(); + if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(boolean kill, ByteBuffer buffer) { @@ -296,7 +305,7 @@ public abstract class Response> { final boolean more = data != null && this.request.keepAlive; this.request.more = more; conn.write(buffer, buffer, finishHandler); - if (more) new PrepareRunner(this.context, conn, data, null).run(); + if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(ByteBuffer... buffers) { @@ -306,7 +315,7 @@ public abstract class Response> { final boolean more = data != null && this.request.keepAlive; this.request.more = more; conn.write(buffers, buffers, finishHandler2); - if (more) new PrepareRunner(this.context, conn, data, null).run(); + if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } public void finish(boolean kill, ByteBuffer... buffers) { @@ -317,7 +326,7 @@ public abstract class Response> { final boolean more = data != null && this.request.keepAlive; this.request.more = more; conn.write(buffers, buffers, finishHandler2); - if (more) new PrepareRunner(this.context, conn, data, null).run(); + if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run(); } protected void send(final ByteBuffer buffer, final A attachment, final CompletionHandler handler) { @@ -328,14 +337,14 @@ public abstract class Response> { if (buffer.hasRemaining()) { channel.write(buffer, attachment, this); } else { - context.offerBuffer(buffer); + bufferPool.accept(buffer); if (handler != null) handler.completed(result, attachment); } } @Override public void failed(Throwable exc, A attachment) { - context.offerBuffer(buffer); + bufferPool.accept(buffer); if (handler != null) handler.failed(exc, attachment); } @@ -353,7 +362,7 @@ public abstract class Response> { index = i; break; } - context.offerBuffer(buffers[i]); + bufferPool.accept(buffers[i]); } if (index == 0) { channel.write(buffers, attachment, this); @@ -367,7 +376,7 @@ public abstract class Response> { @Override public void failed(Throwable exc, A attachment) { for (ByteBuffer buffer : buffers) { - context.offerBuffer(buffer); + bufferPool.accept(buffer); } if (handler != null) handler.failed(exc, attachment); } diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index 929f07cbc..bd220196b 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -7,14 +7,14 @@ package org.redkale.net; import java.io.*; import java.net.*; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.text.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.logging.*; import javax.net.ssl.SSLContext; -import org.redkale.net.Filter; import org.redkale.util.*; /** @@ -281,7 +281,7 @@ public abstract class Server createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize); + + //必须在 createContext()之后调用 + protected abstract ObjectPool createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize); + + //必须在 createResponsePool()之后调用 + protected abstract Creator createResponseCreator(ObjectPool bufferPool, ObjectPool responsePool); + public void shutdown() throws IOException { long s = System.currentTimeMillis(); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning"); diff --git a/src/org/redkale/net/TcpAioProtocolServer.java b/src/org/redkale/net/TcpAioProtocolServer.java index f5f582808..c4dc8a72d 100644 --- a/src/org/redkale/net/TcpAioProtocolServer.java +++ b/src/org/redkale/net/TcpAioProtocolServer.java @@ -7,10 +7,12 @@ package org.redkale.net; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; -import org.redkale.util.AnyValue; +import org.redkale.util.*; /** * 协议底层Server @@ -70,7 +72,14 @@ public class TcpAioProtocolServer extends ProtocolServer { } @Override - public void accept() throws IOException { + public void accept(Server server) throws IOException { + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); + ObjectPool bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + AtomicLong createResponseCounter = new AtomicLong(); + AtomicLong cycleResponseCounter = new AtomicLong(); + ObjectPool responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); + responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); final AsynchronousServerSocketChannel serchannel = this.serverChannel; serchannel.accept(null, new CompletionHandler() { @@ -93,9 +102,9 @@ public class TcpAioProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel, + AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel, context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); - context.runAsync(new PrepareRunner(context, conn, null, null)); + context.runAsync(new PrepareRunner(context, responsePool, conn, null, null)); } catch (Throwable e) { context.logger.log(Level.INFO, channel + " accept error", e); } diff --git a/src/org/redkale/net/UdpBioProtocolServer.java b/src/org/redkale/net/UdpBioProtocolServer.java index 9e367d63b..4eed02d85 100644 --- a/src/org/redkale/net/UdpBioProtocolServer.java +++ b/src/org/redkale/net/UdpBioProtocolServer.java @@ -11,7 +11,8 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.redkale.util.AnyValue; +import java.util.concurrent.atomic.AtomicLong; +import org.redkale.util.*; /** * 协议底层Server @@ -70,7 +71,14 @@ public class UdpBioProtocolServer extends ProtocolServer { } @Override - public void accept() throws IOException { + public void accept(Server server) throws IOException { + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); + ObjectPool bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + AtomicLong createResponseCounter = new AtomicLong(); + AtomicLong cycleResponseCounter = new AtomicLong(); + ObjectPool responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); + responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); final DatagramChannel serchannel = this.serverChannel; final int readTimeoutSeconds = this.context.readTimeoutSeconds; final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; @@ -81,15 +89,15 @@ public class UdpBioProtocolServer extends ProtocolServer { public void run() { cdl.countDown(); while (running) { - final ByteBuffer buffer = context.pollBuffer(); + final ByteBuffer buffer = bufferPool.get(); try { SocketAddress address = serchannel.receive(buffer); buffer.flip(); - AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel, + AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel, context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); - context.runAsync(new PrepareRunner(context, conn, buffer, null)); + context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null)); } catch (Exception e) { - context.offerBuffer(buffer); + bufferPool.accept(buffer); } } } diff --git a/src/org/redkale/net/http/HttpContext.java b/src/org/redkale/net/http/HttpContext.java index ea0309473..0aa883403 100644 --- a/src/org/redkale/net/http/HttpContext.java +++ b/src/org/redkale/net/http/HttpContext.java @@ -28,6 +28,8 @@ public class HttpContext extends Context { protected final ConcurrentHashMap asyncHandlerCreators = new ConcurrentHashMap<>(); + protected String remoteAddrHeader; + public HttpContext(HttpContextConfig config) { super(config); random.setSeed(Math.abs(System.nanoTime())); @@ -43,10 +45,6 @@ public class HttpContext extends Context { return executor; } - protected ObjectPool getResponsePool() { - return responsePool; - } - @SuppressWarnings("unchecked") protected Creator loadAsyncHandlerCreator(Class handlerClass) { Creator creator = asyncHandlerCreators.get(handlerClass); @@ -162,5 +160,6 @@ public class HttpContext extends Context { public static class HttpContextConfig extends ContextConfig { + public String remoteAddrHeader; } } diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index b009b770c..d48a3cd8d 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -79,9 +79,9 @@ public class HttpRequest extends Request { Object attachment; //仅供HttpServlet传递Entry使用 - public HttpRequest(HttpContext context, String remoteAddrHeader) { - super(context); - this.remoteAddrHeader = remoteAddrHeader; + public HttpRequest(HttpContext context, ObjectPool bufferPool) { + super(context, bufferPool); + this.remoteAddrHeader = context.remoteAddrHeader; } protected boolean isWebSocket() { diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index c8c91b026..19cb783fa 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -148,8 +148,8 @@ public class HttpResponse extends Response { return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); } - public HttpResponse(HttpContext context, HttpRequest request, HttpResponseConfig config) { - super(context, request); + public HttpResponse(HttpContext context, HttpRequest request, ObjectPool responsePool, HttpResponseConfig config) { + super(context, request, responsePool); this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType; this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType; this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes(); @@ -174,6 +174,11 @@ public class HttpResponse extends Response { return channel; } + @Override + protected void prepare() { + super.prepare(); + } + @Override protected boolean recycle() { this.status = 200; @@ -1197,7 +1202,7 @@ public class HttpResponse extends Response { @Override public void failed(Throwable exc, ByteBuffer attachment) { - context.offerBuffer(attachment); + bufferPool.accept(attachment); finish(true); try { filechannel.close(); diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index ff13d33d0..aaee70cc1 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -37,6 +37,8 @@ public class HttpServer extends Server bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; - e.clear(); - return true; - }); final List defaultAddHeaders = new ArrayList<>(); final List defaultSetHeaders = new ArrayList<>(); boolean autoOptions = false; @@ -423,7 +416,7 @@ public class HttpServer extends Server responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - final HttpContextConfig contextConfig = new HttpContextConfig(); contextConfig.serverStartTime = this.serverStartTime; contextConfig.logger = this.logger; contextConfig.executor = this.executor; contextConfig.sslContext = this.sslContext; - contextConfig.bufferCapacity = rcapacity; - contextConfig.bufferPool = bufferPool; - contextConfig.responsePool = responsePool; + contextConfig.bufferCapacity = this.bufferCapacity; contextConfig.maxconns = this.maxconns; contextConfig.maxbody = this.maxbody; contextConfig.charset = this.charset; @@ -454,9 +441,32 @@ public class HttpServer extends Server new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), respConfig)); - return httpcontext; + return new HttpContext(contextConfig); + } + + @Override + protected ObjectPool createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) { + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); + final int rcapacity = this.bufferCapacity; + ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; + e.clear(); + return true; + }); + return bufferPool; + } + + @Override + protected ObjectPool createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) { + return HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null); + } + + @Override + protected Creator createResponseCreator(ObjectPool bufferPool, ObjectPool responsePool) { + return (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context, bufferPool), responsePool, this.respConfig); } } diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 0029b2308..d06b136c1 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -11,10 +11,11 @@ import java.net.*; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.function.Supplier; +import java.util.function.*; import java.util.logging.*; import java.util.stream.Stream; import org.redkale.convert.Convert; +import org.redkale.net.AsyncConnection; import org.redkale.util.Comment; /** @@ -82,6 +83,8 @@ public abstract class WebSocket { WebSocketEngine _engine; //不可能为空 + AsyncConnection _channel;//不可能为空 + String _sessionid; //不可能为空 G _userid; //不可能为空 @@ -674,12 +677,21 @@ public abstract class WebSocket { } /** - * 获取ByteBuffer资源池 + * 获取ByteBuffer生成器 * * @return Supplier */ - protected Supplier getByteBufferSupplier() { - return this._runner.context.getBufferSupplier(); + protected Supplier getBufferSupplier() { + return this._channel.getBufferSupplier(); + } + + /** + * 获取ByteBuffer回收器 + * + * @return Consumer + */ + protected Consumer getBufferConsumer() { + return this._channel.getBufferConsumer(); } //------------------------------------------------------------------- diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 948c8c9e7..ee06fc55a 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -7,6 +7,7 @@ package org.redkale.net.http; import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL; import java.io.*; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -229,26 +230,45 @@ public class WebSocketEngine { } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); if (more) { + Supplier bufferSupplier = null; + Consumer bufferConsumer = null; //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); - packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); + //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); CompletableFuture future = null; if (single) { for (WebSocket websocket : websockets.values()) { if (predicate != null && !predicate.test(websocket)) continue; + if (bufferSupplier == null) { + bufferSupplier = websocket.getBufferSupplier(); + bufferConsumer = websocket.getBufferConsumer(); + packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } } else { for (List list : websockets2.values()) { for (WebSocket websocket : list) { if (predicate != null && !predicate.test(websocket)) continue; + if (bufferSupplier == null) { + bufferSupplier = websocket.getBufferSupplier(); + bufferConsumer = websocket.getBufferConsumer(); + packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } } } - if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); + final Consumer bufferConsumer0 = bufferConsumer; + if (future != null) future.whenComplete((rs, ex) -> { + if (packet.sendBuffers != null && bufferConsumer0 != null) { + for (ByteBuffer buffer : packet.sendBuffers) { + bufferConsumer0.accept(buffer); + } + } + }); return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } else { CompletableFuture future = null; @@ -286,16 +306,23 @@ public class WebSocketEngine { } final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; if (more) { + Supplier bufferSupplier = null; + Consumer bufferConsumer = null; //此处的WebSocketPacket只能是包含payload或bytes内容的,不能包含sendConvert、sendJson、sendBuffers final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); - packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); + //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); CompletableFuture future = null; if (single) { for (Serializable userid : userids) { WebSocket websocket = websockets.get(userid); if (websocket == null) continue; + if (bufferSupplier == null) { + bufferSupplier = websocket.getBufferSupplier(); + bufferConsumer = websocket.getBufferConsumer(); + packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } } else { @@ -303,11 +330,23 @@ public class WebSocketEngine { List list = websockets2.get(userid); if (list == null) continue; for (WebSocket websocket : list) { + if (bufferSupplier == null) { + bufferSupplier = websocket.getBufferSupplier(); + bufferConsumer = websocket.getBufferConsumer(); + packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } } } - if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); + final Consumer bufferConsumer0 = bufferConsumer; + if (future != null) future.whenComplete((rs, ex) -> { + if (packet.sendBuffers != null && bufferConsumer0 != null) { + for (ByteBuffer buffer : packet.sendBuffers) { + bufferConsumer0.accept(buffer); + } + } + }); return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } else { CompletableFuture future = null; diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index 39e136b4f..af41d3c2d 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -492,7 +492,7 @@ public final class WebSocketPacket { void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) { if (webSocket._engine.cryptor != null) { HttpContext context = webSocket._engine.context; - buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer()); + buffers = webSocket._engine.cryptor.decrypt(buffers, webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer()); } FrameType selfType = this.type; final boolean series = selfType == FrameType.SERIES; diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 2b67e77f8..b1455879d 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -5,7 +5,6 @@ */ package org.redkale.net.http; -import org.redkale.net.AsyncConnection; import static org.redkale.net.http.WebSocket.*; import org.redkale.net.http.WebSocketPacket.FrameType; import java.nio.ByteBuffer; @@ -29,8 +28,6 @@ class WebSocketRunner implements Runnable { private final WebSocketEngine engine; - private final AsyncConnection channel; - private final WebSocket webSocket; protected final HttpContext context; @@ -49,13 +46,12 @@ class WebSocketRunner implements Runnable { protected long lastReadTime; - WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer messageConsumer, AsyncConnection channel) { + WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer messageConsumer) { this.context = context; this.engine = webSocket._engine; this.webSocket = webSocket; this.mergemsg = webSocket._engine.mergemsg; this.restMessageConsumer = messageConsumer; - this.channel = channel; } @Override @@ -64,10 +60,10 @@ class WebSocketRunner implements Runnable { final WebSocketRunner self = this; try { webSocket.onConnected(); - channel.setReadTimeoutSeconds(300); //读取超时5分钟 - if (channel.isOpen()) { + webSocket._channel.setReadTimeoutSeconds(300); //读取超时5分钟 + if (webSocket._channel.isOpen()) { final int wsmaxbody = webSocket._engine.wsmaxbody; - channel.read(new CompletionHandler() { + webSocket._channel.read(new CompletionHandler() { //尚未解析完的数据包 private WebSocketPacket unfinishPacket; @@ -94,11 +90,11 @@ class WebSocketRunner implements Runnable { onePacket = unfinishPacket; unfinishPacket = null; for (ByteBuffer b : exBuffers) { - context.offerBuffer(b); + webSocket._channel.offerBuffer(b); } exBuffers.clear(); } else { //需要继续接收, 此处不能回收readBuffer - channel.read(this); + webSocket._channel.read(this); return; } } @@ -125,7 +121,7 @@ class WebSocketRunner implements Runnable { } //继续监听消息 if (readBuffer.hasRemaining()) { //exBuffers缓存了 - readBuffer = context.pollBuffer(); + readBuffer = webSocket._channel.pollReadBuffer(); } else { readBuffer.clear(); } @@ -133,8 +129,8 @@ class WebSocketRunner implements Runnable { readBuffer.put(halfBytes.getValue()); halfBytes.setValue(null); } - channel.setReadBuffer(readBuffer); - channel.read(this); + webSocket._channel.setReadBuffer(readBuffer); + webSocket._channel.read(this); //消息处理 for (final WebSocketPacket packet : packets) { @@ -229,11 +225,11 @@ class WebSocketRunner implements Runnable { //System.out.println("推送消息"); final CompletableFuture futureResult = new CompletableFuture<>(); try { - ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor); + ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); this.lastSendTime = System.currentTimeMillis(); - channel.write(buffers, buffers, new CompletionHandler() { + webSocket._channel.write(buffers, buffers, new CompletionHandler() { private CompletableFuture future = futureResult; @@ -245,7 +241,7 @@ class WebSocketRunner implements Runnable { future = null; if (attachments != null) { for (ByteBuffer buf : attachments) { - context.offerBuffer(buf); + webSocket._channel.offerBuffer(buf); } } } @@ -260,7 +256,7 @@ class WebSocketRunner implements Runnable { } } if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法 - channel.write(attachments, index, attachments.length - index, attachments, this); + webSocket._channel.write(attachments, index, attachments.length - index, attachments, this); return; } if (future != null) { @@ -268,7 +264,7 @@ class WebSocketRunner implements Runnable { future = null; if (attachments != null) { for (ByteBuffer buf : attachments) { - context.offerBuffer(buf); + webSocket._channel.offerBuffer(buf); } } } @@ -310,7 +306,7 @@ class WebSocketRunner implements Runnable { if (closed) return null; closed = true; CompletableFuture future = engine.removeLocalThenClose(webSocket); - channel.dispose(); + webSocket._channel.dispose(); webSocket.onClose(code, reason); return future; } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index a44d89965..f03d7bd42 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -202,6 +202,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } final WebSocket webSocket = this.createWebSocket(); webSocket._engine = this.node.localEngine; + webSocket._channel = response.getChannel(); webSocket._messageTextType = this.messageTextType; webSocket._textConvert = textConvert; webSocket._binaryConvert = binaryConvert; @@ -262,7 +263,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Consumer task = (oldkilled) -> { if (oldkilled) { WebSocketServlet.this.node.localEngine.addLocal(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); + response.removeChannel(); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); webSocket._runner = runner; context.runAsync(runner); response.finish(true); @@ -283,7 +285,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } } else { WebSocketServlet.this.node.localEngine.addLocal(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); + response.removeChannel(); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); webSocket._runner = runner; context.runAsync(runner); response.finish(true); @@ -291,14 +294,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }); } else { WebSocketServlet.this.node.localEngine.addLocal(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); + response.removeChannel(); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer); webSocket._runner = runner; context.runAsync(runner); response.finish(true); } }; if (webSocket.delayPackets != null) { //存在待发送的消息 - if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer); List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; CompletableFuture cf = null; @@ -323,7 +327,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }); }; if (webSocket.delayPackets != null) { //存在待发送的消息 - if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer); List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; CompletableFuture cf = null; diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java index 463614606..672d07f2a 100644 --- a/src/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/org/redkale/net/sncp/SncpDynServlet.java @@ -112,7 +112,7 @@ public final class SncpDynServlet extends SncpServlet { @SuppressWarnings("unchecked") public void execute(SncpRequest request, SncpResponse response) throws IOException { if (bufferSupplier == null) { - bufferSupplier = request.getContext().getBufferSupplier(); + bufferSupplier = request.getBufferPool(); } final SncpServletAction action = actions.get(request.getActionid()); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); diff --git a/src/org/redkale/net/sncp/SncpRequest.java b/src/org/redkale/net/sncp/SncpRequest.java index 5d7e076f8..e8b65faa9 100644 --- a/src/org/redkale/net/sncp/SncpRequest.java +++ b/src/org/redkale/net/sncp/SncpRequest.java @@ -45,11 +45,15 @@ public final class SncpRequest extends Request { private byte[] bufferbytes = new byte[6]; - protected SncpRequest(SncpContext context) { - super(context); + protected SncpRequest(SncpContext context, ObjectPool bufferPool) { + super(context, bufferPool); this.convert = context.getBsonConvert(); } + protected ObjectPool getBufferPool() { + return this.bufferPool; + } + @Override protected int readHeader(ByteBuffer buffer) { if (buffer.remaining() < HEADER_SIZE) { diff --git a/src/org/redkale/net/sncp/SncpResponse.java b/src/org/redkale/net/sncp/SncpResponse.java index 3938e925f..f133fd980 100644 --- a/src/org/redkale/net/sncp/SncpResponse.java +++ b/src/org/redkale/net/sncp/SncpResponse.java @@ -45,8 +45,8 @@ public final class SncpResponse extends Response { return null; } - protected SncpResponse(SncpContext context, SncpRequest request) { - super(context, request); + protected SncpResponse(SncpContext context, SncpRequest request, ObjectPool responsePool) { + super(context, request, responsePool); this.addrBytes = context.getServerAddress().getAddress().getAddress(); this.addrPort = context.getServerAddress().getPort(); if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4"); @@ -56,7 +56,7 @@ public final class SncpResponse extends Response { protected void offerBuffer(ByteBuffer... buffers) { super.offerBuffer(buffers); } - + public void finish(final int retcode, final BsonWriter out) { if (out == null) { final ByteBuffer buffer = pollWriteReadBuffer(); diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index 822757330..2d87fc7e0 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -99,28 +99,14 @@ public class SncpServer extends Server bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; - e.clear(); - return true; - }); - AtomicLong createResponseCounter = new AtomicLong(); - AtomicLong cycleResponseCounter = new AtomicLong(); - ObjectPool responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); + this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024); final SncpContextConfig contextConfig = new SncpContextConfig(); contextConfig.serverStartTime = this.serverStartTime; contextConfig.logger = this.logger; contextConfig.executor = this.executor; contextConfig.sslContext = this.sslContext; - contextConfig.bufferCapacity = rcapacity; - contextConfig.bufferPool = bufferPool; - contextConfig.responsePool = responsePool; + contextConfig.bufferCapacity = this.bufferCapacity; contextConfig.maxconns = this.maxconns; contextConfig.maxbody = this.maxbody; contextConfig.charset = this.charset; @@ -131,9 +117,31 @@ public class SncpServer extends Server new SncpResponse(sncpcontext, new SncpRequest(sncpcontext))); - return sncpcontext; + return new SncpContext(contextConfig); + } + + @Override + protected ObjectPool createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) { + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); + final int rcapacity = this.bufferCapacity; + ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; + e.clear(); + return true; + }); + return bufferPool; + } + + @Override + protected ObjectPool createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) { + return SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null); + } + + @Override + protected Creator createResponseCreator(ObjectPool bufferPool, ObjectPool responsePool) { + return (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context, bufferPool), responsePool); } }