从Context中移除BufferPool和ResponsePool

This commit is contained in:
Redkale
2019-06-20 15:26:20 +08:00
parent 98e9ffe0ef
commit 99ae4ccadd
22 changed files with 263 additions and 236 deletions

View File

@@ -52,10 +52,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
//关联的事件数, 小于1表示没有事件 //关联的事件数, 小于1表示没有事件
protected final AtomicInteger eventing = new AtomicInteger(); protected final AtomicInteger eventing = new AtomicInteger();
protected AsyncConnection(Context context) {
this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext());
}
protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) { protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) {
this(bufferPool, bufferPool, sslContext); this(bufferPool, bufferPool, sslContext);
} }
@@ -68,6 +64,14 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
this.sslContext = sslContext; this.sslContext = sslContext;
} }
public Supplier<ByteBuffer> getBufferSupplier() {
return this.bufferSupplier;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return this.bufferConsumer;
}
public final long getLastReadTime() { public final long getLastReadTime() {
return readtime; return readtime;
} }
@@ -245,22 +249,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds); return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
} }
/**
* 创建TCP协议客户端连接
*
* @param context Context
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
* @param writeTimeoutSeconds 写入超时秒数
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final Context context, final AsynchronousChannelGroup group,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds);
}
/** /**
* 创建TCP协议客户端连接 * 创建TCP协议客户端连接
* *
@@ -371,35 +359,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
} }
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) {
return create(context, ch, (SocketAddress) null, 0, 0);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) { public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) {
return create(bufferPool, ch, null, 0, 0); return create(bufferPool, ch, null, 0, 0);
} }

View File

@@ -6,11 +6,8 @@
package org.redkale.net; package org.redkale.net;
import java.net.*; import java.net.*;
import java.nio.*;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.Collection;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
@@ -39,12 +36,6 @@ public class Context {
//ByteBuffer的容量默认8K //ByteBuffer的容量默认8K
protected final int bufferCapacity; protected final int bufferCapacity;
//ByteBuffer对象池
protected final ObjectPool<ByteBuffer> bufferPool;
//Response对象池
protected final ObjectPool<Response> responsePool;
//服务的根Servlet //服务的根Servlet
protected final PrepareServlet prepare; protected final PrepareServlet prepare;
@@ -83,22 +74,18 @@ public class Context {
public Context(ContextConfig config) { public Context(ContextConfig config) {
this(config.serverStartTime, config.logger, config.executor, config.sslContext, this(config.serverStartTime, config.logger, config.executor, config.sslContext,
config.bufferCapacity, config.bufferPool, config.responsePool, config.maxconns, config.maxbody, config.bufferCapacity, config.maxconns, config.maxbody, config.charset, config.address, config.resourceFactory,
config.charset, config.address, config.resourceFactory, config.prepare, config.prepare, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
} }
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext, public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool, final int maxconns, int bufferCapacity, final int maxconns, final int maxbody, Charset charset, InetSocketAddress address,
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, ResourceFactory resourceFactory, PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
final PrepareServlet prepare, final int aliveTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.logger = logger; this.logger = logger;
this.executor = executor; this.executor = executor;
this.sslContext = sslContext; this.sslContext = sslContext;
this.bufferCapacity = bufferCapacity; this.bufferCapacity = bufferCapacity;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.maxconns = maxconns; this.maxconns = maxconns;
this.maxbody = maxbody; this.maxbody = maxbody;
this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset; this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset;
@@ -160,36 +147,6 @@ public class Context {
return bufferCapacity; return bufferCapacity;
} }
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return bufferPool;
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public void offerBuffer(Collection<ByteBuffer> buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public Logger getLogger() { public Logger getLogger() {
return logger; return logger;
} }
@@ -228,12 +185,6 @@ public class Context {
//ByteBuffer的容量默认8K //ByteBuffer的容量默认8K
public int bufferCapacity; public int bufferCapacity;
//ByteBuffer对象池
public ObjectPool<ByteBuffer> bufferPool;
//Response对象池
public ObjectPool<Response> responsePool;
//服务的根Servlet //服务的根Servlet
public PrepareServlet prepare; public PrepareServlet prepare;

View File

@@ -28,12 +28,15 @@ public class PrepareRunner implements Runnable {
private final Context context; private final Context context;
private final ObjectPool<Response> responsePool;
private ByteBuffer data; private ByteBuffer data;
private Response response; private Response response;
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) { public PrepareRunner(Context context, ObjectPool<Response> responsePool, AsyncConnection channel, ByteBuffer data, Response response) {
this.context = context; this.context = context;
this.responsePool = responsePool;
this.channel = channel; this.channel = channel;
this.data = data; this.data = data;
this.response = response; this.response = response;
@@ -42,7 +45,6 @@ public class PrepareRunner implements Runnable {
@Override @Override
public void run() { public void run() {
final boolean keepalive = response != null; final boolean keepalive = response != null;
final ObjectPool<? extends Response> responsePool = context.responsePool;
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
if (response == null) response = responsePool.get(); if (response == null) response = responsePool.get();
try { try {
@@ -165,7 +167,7 @@ public class PrepareRunner implements Runnable {
} }
protected Response pollResponse() { protected Response pollResponse() {
return context.responsePool.get(); return responsePool.get();
} }
protected Request pollRequest(Response response) { protected Request pollRequest(Response response) {

View File

@@ -43,7 +43,7 @@ public abstract class ProtocolServer {
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException; public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;
public abstract void accept() throws IOException; public abstract void accept(Server server) throws IOException;
public abstract void close() throws IOException; public abstract void close() throws IOException;

View File

@@ -9,6 +9,7 @@ import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.ObjectPool;
/** /**
* 协议请求对象 * 协议请求对象
@@ -23,6 +24,8 @@ public abstract class Request<C extends Context> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final BsonConvert bsonConvert; protected final BsonConvert bsonConvert;
protected final JsonConvert jsonConvert; protected final JsonConvert jsonConvert;
@@ -47,9 +50,9 @@ public abstract class Request<C extends Context> {
protected final Map<String, Object> attributes = new HashMap<>(); protected final Map<String, Object> attributes = new HashMap<>();
protected Request(C context) { protected Request(C context, ObjectPool<ByteBuffer> bufferPool) {
this.context = context; this.context = context;
this.readBuffer = context.pollBuffer(); this.bufferPool = bufferPool;
this.bsonConvert = context.getBsonConvert(); this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert(); this.jsonConvert = context.getJsonConvert();
} }
@@ -67,7 +70,7 @@ public abstract class Request<C extends Context> {
protected ByteBuffer pollReadBuffer() { protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer; ByteBuffer buffer = this.readBuffer;
this.readBuffer = null; this.readBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
@@ -77,7 +80,7 @@ public abstract class Request<C extends Context> {
buffer.clear(); buffer.clear();
this.readBuffer = buffer; this.readBuffer = buffer;
} else { } else {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
} }

View File

@@ -10,6 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.util.ObjectPool;
/** /**
* 协议响应对象 * 协议响应对象
@@ -26,6 +27,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool;
protected final R request; protected final R request;
protected AsyncConnection channel; protected AsyncConnection channel;
@@ -66,15 +71,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer attachment) { private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) { if (writeHeadBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) { if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment; writeHeadBuffer = attachment;
} }
} else if (writeBodyBuffer == null) { } else if (writeBodyBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) { if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment; writeBodyBuffer = attachment;
} }
} else { } else {
context.offerBuffer(attachment); bufferPool.accept(attachment);
} }
} }
@@ -108,31 +113,33 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer[] attachments) { private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0; int start = 0;
if (writeHeadBuffer == null && attachments.length > start) { if (writeHeadBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start]; writeHeadBuffer = attachments[start];
start++; start++;
} }
} }
if (writeBodyBuffer == null && attachments.length > start) { if (writeBodyBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start]; writeBodyBuffer = attachments[start];
start++; start++;
} }
} }
for (int i = start; i < attachments.length; i++) { for (int i = start; i < attachments.length; i++) {
context.offerBuffer(attachments[i]); bufferPool.accept(attachments[i]);
} }
} }
}; };
protected Response(C context, final R request) { protected Response(C context, final R request, ObjectPool<Response> responsePool) {
this.context = context; this.context = context;
this.request = request; this.request = request;
this.writeHeadBuffer = context.pollBuffer(); this.bufferPool = request.bufferPool;
this.writeBodyBuffer = context.pollBuffer(); this.responsePool = responsePool;
this.writeHeadBuffer = bufferPool.get();
this.writeBodyBuffer = bufferPool.get();
this.bodyBufferSupplier = () -> { this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer; ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return context.pollBuffer(); if (buffer == null) return bufferPool.get();
writeBodyBuffer = null; writeBodyBuffer = null;
return buffer; return buffer;
}; };
@@ -141,14 +148,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected ByteBuffer pollWriteReadBuffer() { protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer; ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null; this.writeHeadBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
protected ByteBuffer pollWriteBodyBuffer() { protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer; ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null; this.writeBodyBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
@@ -157,7 +164,9 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
protected void offerBuffer(ByteBuffer... buffers) { protected void offerBuffer(ByteBuffer... buffers) {
context.offerBuffer(buffers); for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
} }
protected AsyncConnection removeChannel() { protected AsyncConnection removeChannel() {
@@ -257,19 +266,19 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel(); AsyncConnection conn = removeChannel();
this.recycle(); this.recycle();
this.prepare(); this.prepare();
new PrepareRunner(context, conn, null, this).run(); new PrepareRunner(context, this.responsePool, conn, null, this).run();
} else { } else {
channel.dispose(); channel.dispose();
} }
} else { } else {
this.context.responsePool.accept(this); this.responsePool.accept(this);
} }
} }
public void finish(final byte[] bs) { public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) { if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = this.context.pollBuffer(); ByteBuffer buffer = this.bufferPool.get();
buffer.put(bs); buffer.put(bs);
buffer.flip(); buffer.flip();
this.finish(buffer); this.finish(buffer);
@@ -285,7 +294,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
@@ -296,7 +305,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(ByteBuffer... buffers) { public void finish(ByteBuffer... buffers) {
@@ -306,7 +315,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffers, buffers, finishHandler2); conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer... buffers) { public void finish(boolean kill, ByteBuffer... buffers) {
@@ -317,7 +326,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffers, buffers, finishHandler2); conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) { protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
@@ -328,14 +337,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this); channel.write(buffer, attachment, this);
} else { } else {
context.offerBuffer(buffer); bufferPool.accept(buffer);
if (handler != null) handler.completed(result, attachment); if (handler != null) handler.completed(result, attachment);
} }
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }
@@ -353,7 +362,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
index = i; index = i;
break; break;
} }
context.offerBuffer(buffers[i]); bufferPool.accept(buffers[i]);
} }
if (index == 0) { if (index == 0) {
channel.write(buffers, attachment, this); channel.write(buffers, attachment, this);
@@ -367,7 +376,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }

View File

@@ -7,14 +7,14 @@ package org.redkale.net;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.text.*; import java.text.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.net.Filter;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
@@ -281,7 +281,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl")); this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
this.serverChannel.open(config); this.serverChannel.open(config);
serverChannel.bind(address, backlog); serverChannel.bind(address, backlog);
serverChannel.accept(); serverChannel.accept(this);
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
@@ -299,7 +299,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
newServerChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl")); newServerChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
newServerChannel.open(config); newServerChannel.open(config);
newServerChannel.bind(addr, backlog); newServerChannel.bind(addr, backlog);
newServerChannel.accept(); newServerChannel.accept(this);
} catch (IOException e) { } catch (IOException e) {
context.address = oldAddress; context.address = oldAddress;
throw e; throw e;
@@ -358,6 +358,15 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
protected abstract C createContext(); protected abstract C createContext();
//必须在 createContext()之后调用
protected abstract ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize);
//必须在 createContext()之后调用
protected abstract ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize);
//必须在 createResponsePool()之后调用
protected abstract Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool);
public void shutdown() throws IOException { public void shutdown() throws IOException {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning"); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning");

View File

@@ -7,10 +7,12 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.util.AnyValue; import org.redkale.util.*;
/** /**
* 协议底层Server * 协议底层Server
@@ -70,7 +72,14 @@ public class TcpAioProtocolServer extends ProtocolServer {
} }
@Override @Override
public void accept() throws IOException { public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final AsynchronousServerSocketChannel serchannel = this.serverChannel; final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@@ -93,9 +102,9 @@ public class TcpAioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel, AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
context.runAsync(new PrepareRunner(context, conn, null, null)); context.runAsync(new PrepareRunner(context, responsePool, conn, null, null));
} catch (Throwable e) { } catch (Throwable e) {
context.logger.log(Level.INFO, channel + " accept error", e); context.logger.log(Level.INFO, channel + " accept error", e);
} }

View File

@@ -11,7 +11,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.redkale.util.AnyValue; import java.util.concurrent.atomic.AtomicLong;
import org.redkale.util.*;
/** /**
* 协议底层Server * 协议底层Server
@@ -70,7 +71,14 @@ public class UdpBioProtocolServer extends ProtocolServer {
} }
@Override @Override
public void accept() throws IOException { public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final DatagramChannel serchannel = this.serverChannel; final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds; final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
@@ -81,15 +89,15 @@ public class UdpBioProtocolServer extends ProtocolServer {
public void run() { public void run() {
cdl.countDown(); cdl.countDown();
while (running) { while (running) {
final ByteBuffer buffer = context.pollBuffer(); final ByteBuffer buffer = bufferPool.get();
try { try {
SocketAddress address = serchannel.receive(buffer); SocketAddress address = serchannel.receive(buffer);
buffer.flip(); buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel, AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel,
context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, buffer, null)); context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null));
} catch (Exception e) { } catch (Exception e) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
} }
} }

View File

@@ -28,6 +28,8 @@ public class HttpContext extends Context {
protected final ConcurrentHashMap<Class, Creator> asyncHandlerCreators = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Class, Creator> asyncHandlerCreators = new ConcurrentHashMap<>();
protected String remoteAddrHeader;
public HttpContext(HttpContextConfig config) { public HttpContext(HttpContextConfig config) {
super(config); super(config);
random.setSeed(Math.abs(System.nanoTime())); random.setSeed(Math.abs(System.nanoTime()));
@@ -43,10 +45,6 @@ public class HttpContext extends Context {
return executor; return executor;
} }
protected ObjectPool<Response> getResponsePool() {
return responsePool;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) { protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass); Creator<H> creator = asyncHandlerCreators.get(handlerClass);
@@ -162,5 +160,6 @@ public class HttpContext extends Context {
public static class HttpContextConfig extends ContextConfig { public static class HttpContextConfig extends ContextConfig {
public String remoteAddrHeader;
} }
} }

View File

@@ -79,9 +79,9 @@ public class HttpRequest extends Request<HttpContext> {
Object attachment; //仅供HttpServlet传递Entry使用 Object attachment; //仅供HttpServlet传递Entry使用
public HttpRequest(HttpContext context, String remoteAddrHeader) { public HttpRequest(HttpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context); super(context, bufferPool);
this.remoteAddrHeader = remoteAddrHeader; this.remoteAddrHeader = context.remoteAddrHeader;
} }
protected boolean isWebSocket() { protected boolean isWebSocket() {

View File

@@ -148,8 +148,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
} }
public HttpResponse(HttpContext context, HttpRequest request, HttpResponseConfig config) { public HttpResponse(HttpContext context, HttpRequest request, ObjectPool<Response> responsePool, HttpResponseConfig config) {
super(context, request); super(context, request, responsePool);
this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType; this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType;
this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType; this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType;
this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes(); this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes();
@@ -174,6 +174,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return channel; return channel;
} }
@Override
protected void prepare() {
super.prepare();
}
@Override @Override
protected boolean recycle() { protected boolean recycle() {
this.status = 200; this.status = 200;
@@ -1197,7 +1202,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
context.offerBuffer(attachment); bufferPool.accept(attachment);
finish(true); finish(true);
try { try {
filechannel.close(); filechannel.close();

View File

@@ -37,6 +37,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private byte[] currDateBytes; private byte[] currDateBytes;
private HttpResponseConfig respConfig;
public HttpServer() { public HttpServer() {
this(System.currentTimeMillis(), ResourceFactory.root()); this(System.currentTimeMillis(), ResourceFactory.root());
} }
@@ -304,16 +306,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected HttpContext createContext() { protected HttpContext createContext() {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0; this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0;
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
final List<String[]> defaultAddHeaders = new ArrayList<>(); final List<String[]> defaultAddHeaders = new ArrayList<>();
final List<String[]> defaultSetHeaders = new ArrayList<>(); final List<String[]> defaultSetHeaders = new ArrayList<>();
boolean autoOptions = false; boolean autoOptions = false;
@@ -423,7 +416,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final String addrHeader = remoteAddrHeader; final String addrHeader = remoteAddrHeader;
final HttpResponseConfig respConfig = new HttpResponseConfig(); this.respConfig = new HttpResponseConfig();
respConfig.plainContentType = plainContentType; respConfig.plainContentType = plainContentType;
respConfig.jsonContentType = jsonContentType; respConfig.jsonContentType = jsonContentType;
respConfig.defaultAddHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]); respConfig.defaultAddHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]);
@@ -433,18 +426,12 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
respConfig.dateSupplier = dateSupplier; respConfig.dateSupplier = dateSupplier;
respConfig.renders = ((HttpPrepareServlet) prepare).renders; respConfig.renders = ((HttpPrepareServlet) prepare).renders;
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
final HttpContextConfig contextConfig = new HttpContextConfig(); final HttpContextConfig contextConfig = new HttpContextConfig();
contextConfig.serverStartTime = this.serverStartTime; contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger; contextConfig.logger = this.logger;
contextConfig.executor = this.executor; contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext; contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity; contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.maxconns = this.maxconns; contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody; contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset; contextConfig.charset = this.charset;
@@ -454,9 +441,32 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
contextConfig.aliveTimeoutSeconds = this.aliveTimeoutSeconds; contextConfig.aliveTimeoutSeconds = this.aliveTimeoutSeconds;
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds; contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds; contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
contextConfig.remoteAddrHeader = addrHeader;
HttpContext httpcontext = new HttpContext(contextConfig); return new HttpContext(contextConfig);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), respConfig)); }
return httpcontext;
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context, bufferPool), responsePool, this.respConfig);
} }
} }

View File

@@ -11,10 +11,11 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection;
import org.redkale.util.Comment; import org.redkale.util.Comment;
/** /**
@@ -82,6 +83,8 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketEngine _engine; //不可能为空 WebSocketEngine _engine; //不可能为空
AsyncConnection _channel;//不可能为空
String _sessionid; //不可能为空 String _sessionid; //不可能为空
G _userid; //不可能为空 G _userid; //不可能为空
@@ -674,12 +677,21 @@ public abstract class WebSocket<G extends Serializable, T> {
} }
/** /**
* 获取ByteBuffer资源池 * 获取ByteBuffer生成器
* *
* @return Supplier * @return Supplier
*/ */
protected Supplier<ByteBuffer> getByteBufferSupplier() { protected Supplier<ByteBuffer> getBufferSupplier() {
return this._runner.context.getBufferSupplier(); return this._channel.getBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getBufferConsumer() {
return this._channel.getBufferConsumer();
} }
//------------------------------------------------------------------- //-------------------------------------------------------------------

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL; import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -229,26 +230,45 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) { if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers //此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {
for (WebSocket websocket : websockets.values()) { for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue; if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} else { } else {
for (List<WebSocket> list : websockets2.values()) { for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) { for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue; if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} }
} }
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
@@ -286,16 +306,23 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) { if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers //此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {
for (Serializable userid : userids) { for (Serializable userid : userids) {
WebSocket websocket = websockets.get(userid); WebSocket websocket = websockets.get(userid);
if (websocket == null) continue; if (websocket == null) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} else { } else {
@@ -303,11 +330,23 @@ public class WebSocketEngine {
List<WebSocket> list = websockets2.get(userid); List<WebSocket> list = websockets2.get(userid);
if (list == null) continue; if (list == null) continue;
for (WebSocket websocket : list) { for (WebSocket websocket : list) {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} }
} }
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;

View File

@@ -492,7 +492,7 @@ public final class WebSocketPacket {
void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) { void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) {
if (webSocket._engine.cryptor != null) { if (webSocket._engine.cryptor != null) {
HttpContext context = webSocket._engine.context; HttpContext context = webSocket._engine.context;
buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer()); buffers = webSocket._engine.cryptor.decrypt(buffers, webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer());
} }
FrameType selfType = this.type; FrameType selfType = this.type;
final boolean series = selfType == FrameType.SERIES; final boolean series = selfType == FrameType.SERIES;

View File

@@ -5,7 +5,6 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import org.redkale.net.AsyncConnection;
import static org.redkale.net.http.WebSocket.*; import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.http.WebSocketPacket.FrameType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@@ -29,8 +28,6 @@ class WebSocketRunner implements Runnable {
private final WebSocketEngine engine; private final WebSocketEngine engine;
private final AsyncConnection channel;
private final WebSocket webSocket; private final WebSocket webSocket;
protected final HttpContext context; protected final HttpContext context;
@@ -49,13 +46,12 @@ class WebSocketRunner implements Runnable {
protected long lastReadTime; protected long lastReadTime;
WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel) { WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context; this.context = context;
this.engine = webSocket._engine; this.engine = webSocket._engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.mergemsg = webSocket._engine.mergemsg; this.mergemsg = webSocket._engine.mergemsg;
this.restMessageConsumer = messageConsumer; this.restMessageConsumer = messageConsumer;
this.channel = channel;
} }
@Override @Override
@@ -64,10 +60,10 @@ class WebSocketRunner implements Runnable {
final WebSocketRunner self = this; final WebSocketRunner self = this;
try { try {
webSocket.onConnected(); webSocket.onConnected();
channel.setReadTimeoutSeconds(300); //读取超时5分钟 webSocket._channel.setReadTimeoutSeconds(300); //读取超时5分钟
if (channel.isOpen()) { if (webSocket._channel.isOpen()) {
final int wsmaxbody = webSocket._engine.wsmaxbody; final int wsmaxbody = webSocket._engine.wsmaxbody;
channel.read(new CompletionHandler<Integer, ByteBuffer>() { webSocket._channel.read(new CompletionHandler<Integer, ByteBuffer>() {
//尚未解析完的数据包 //尚未解析完的数据包
private WebSocketPacket unfinishPacket; private WebSocketPacket unfinishPacket;
@@ -94,11 +90,11 @@ class WebSocketRunner implements Runnable {
onePacket = unfinishPacket; onePacket = unfinishPacket;
unfinishPacket = null; unfinishPacket = null;
for (ByteBuffer b : exBuffers) { for (ByteBuffer b : exBuffers) {
context.offerBuffer(b); webSocket._channel.offerBuffer(b);
} }
exBuffers.clear(); exBuffers.clear();
} else { //需要继续接收, 此处不能回收readBuffer } else { //需要继续接收, 此处不能回收readBuffer
channel.read(this); webSocket._channel.read(this);
return; return;
} }
} }
@@ -125,7 +121,7 @@ class WebSocketRunner implements Runnable {
} }
//继续监听消息 //继续监听消息
if (readBuffer.hasRemaining()) { //exBuffers缓存了 if (readBuffer.hasRemaining()) { //exBuffers缓存了
readBuffer = context.pollBuffer(); readBuffer = webSocket._channel.pollReadBuffer();
} else { } else {
readBuffer.clear(); readBuffer.clear();
} }
@@ -133,8 +129,8 @@ class WebSocketRunner implements Runnable {
readBuffer.put(halfBytes.getValue()); readBuffer.put(halfBytes.getValue());
halfBytes.setValue(null); halfBytes.setValue(null);
} }
channel.setReadBuffer(readBuffer); webSocket._channel.setReadBuffer(readBuffer);
channel.read(this); webSocket._channel.read(this);
//消息处理 //消息处理
for (final WebSocketPacket packet : packets) { for (final WebSocketPacket packet : packets) {
@@ -229,11 +225,11 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息"); //System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>(); final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try { try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor); ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
this.lastSendTime = System.currentTimeMillis(); this.lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() { webSocket._channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
private CompletableFuture<Integer> future = futureResult; private CompletableFuture<Integer> future = futureResult;
@@ -245,7 +241,7 @@ class WebSocketRunner implements Runnable {
future = null; future = null;
if (attachments != null) { if (attachments != null) {
for (ByteBuffer buf : attachments) { for (ByteBuffer buf : attachments) {
context.offerBuffer(buf); webSocket._channel.offerBuffer(buf);
} }
} }
} }
@@ -260,7 +256,7 @@ class WebSocketRunner implements Runnable {
} }
} }
if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法 if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法
channel.write(attachments, index, attachments.length - index, attachments, this); webSocket._channel.write(attachments, index, attachments.length - index, attachments, this);
return; return;
} }
if (future != null) { if (future != null) {
@@ -268,7 +264,7 @@ class WebSocketRunner implements Runnable {
future = null; future = null;
if (attachments != null) { if (attachments != null) {
for (ByteBuffer buf : attachments) { for (ByteBuffer buf : attachments) {
context.offerBuffer(buf); webSocket._channel.offerBuffer(buf);
} }
} }
} }
@@ -310,7 +306,7 @@ class WebSocketRunner implements Runnable {
if (closed) return null; if (closed) return null;
closed = true; closed = true;
CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket); CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket);
channel.dispose(); webSocket._channel.dispose();
webSocket.onClose(code, reason); webSocket.onClose(code, reason);
return future; return future;
} }

View File

@@ -202,6 +202,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine; webSocket._engine = this.node.localEngine;
webSocket._channel = response.getChannel();
webSocket._messageTextType = this.messageTextType; webSocket._messageTextType = this.messageTextType;
webSocket._textConvert = textConvert; webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert; webSocket._binaryConvert = binaryConvert;
@@ -262,7 +263,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Consumer<Boolean> task = (oldkilled) -> { Consumer<Boolean> task = (oldkilled) -> {
if (oldkilled) { if (oldkilled) {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
@@ -283,7 +285,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
@@ -291,14 +294,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
} }
}; };
if (webSocket.delayPackets != null) { //存在待发送的消息 if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets; List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null; webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null; CompletableFuture<Integer> cf = null;
@@ -323,7 +327,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
}; };
if (webSocket.delayPackets != null) { //存在待发送的消息 if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets; List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null; webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null; CompletableFuture<Integer> cf = null;

View File

@@ -112,7 +112,7 @@ public final class SncpDynServlet extends SncpServlet {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = request.getContext().getBufferSupplier(); bufferSupplier = request.getBufferPool();
} }
final SncpServletAction action = actions.get(request.getActionid()); final SncpServletAction action = actions.get(request.getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));

View File

@@ -45,11 +45,15 @@ public final class SncpRequest extends Request<SncpContext> {
private byte[] bufferbytes = new byte[6]; private byte[] bufferbytes = new byte[6];
protected SncpRequest(SncpContext context) { protected SncpRequest(SncpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context); super(context, bufferPool);
this.convert = context.getBsonConvert(); this.convert = context.getBsonConvert();
} }
protected ObjectPool<ByteBuffer> getBufferPool() {
return this.bufferPool;
}
@Override @Override
protected int readHeader(ByteBuffer buffer) { protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < HEADER_SIZE) { if (buffer.remaining() < HEADER_SIZE) {

View File

@@ -45,8 +45,8 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
return null; return null;
} }
protected SncpResponse(SncpContext context, SncpRequest request) { protected SncpResponse(SncpContext context, SncpRequest request, ObjectPool<Response> responsePool) {
super(context, request); super(context, request, responsePool);
this.addrBytes = context.getServerAddress().getAddress().getAddress(); this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort(); this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4"); if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");

View File

@@ -99,28 +99,14 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected SncpContext createContext() { protected SncpContext createContext() {
final int port = this.address.getPort(); this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024);
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = Math.max(this.bufferCapacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
final SncpContextConfig contextConfig = new SncpContextConfig(); final SncpContextConfig contextConfig = new SncpContextConfig();
contextConfig.serverStartTime = this.serverStartTime; contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger; contextConfig.logger = this.logger;
contextConfig.executor = this.executor; contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext; contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity; contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.maxconns = this.maxconns; contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody; contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset; contextConfig.charset = this.charset;
@@ -131,9 +117,31 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds; contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds; contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
SncpContext sncpcontext = new SncpContext(contextConfig); return new SncpContext(contextConfig);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext))); }
return sncpcontext;
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context, bufferPool), responsePool);
} }
} }