18 Commits

Author SHA1 Message Date
Redkale
7a5fbcdccd 2019-07-03 11:35:37 +08:00
Redkale
345e929712 [不兼容修改]CacheSource的getCollectionMap序列方法增加一个set参数 2019-06-27 18:21:19 +08:00
Redkale
358862fe59 修复Entity类带boolean字段调DataSource.insert出现异常的bug 2019-06-26 16:51:48 +08:00
Redkale
3dde9bb293 2019-06-21 17:00:40 +08:00
Redkale
99ae4ccadd 从Context中移除BufferPool和ResponsePool 2019-06-20 15:26:20 +08:00
Redkale
98e9ffe0ef 2019-06-20 10:02:24 +08:00
Redkale
6927bfe8ac 2019-06-19 16:52:00 +08:00
Redkale
340a3a8fa3 2019-06-19 16:46:12 +08:00
Redkale
4724763991 2019-06-19 16:45:45 +08:00
Redkale
03353ad08c 2019-06-19 16:21:05 +08:00
Redkale
95c3354fcd WebSocketParam增加getAnnotations系列方法 2019-06-19 15:47:46 +08:00
Redkale
1bda2f92b9 HttpRequest增加getAnnotation系列方法 2019-06-18 22:59:07 +08:00
Redkale
bd3c706934 修复DataSource中json字段不为Serializable时会异常的bug 2019-06-13 22:34:53 +08:00
Redkale
ef3663aa36 修复DataSource中json字段不为Serializable时会异常的bug 2019-06-13 22:23:30 +08:00
Redkale
427ff717d4 UDP协议下bufferCapacity默认值为1350字节 2019-05-30 12:07:09 +08:00
Redkale
b409300412 UDP协议下bufferCapacity默认值为1480字节 2019-05-30 12:04:13 +08:00
Redkale
ca1f974dbe 2019-05-28 17:43:48 +08:00
Redkale
6a8c86096b DataSource的clearTable、dropTable在表不存在的情况下由抛异常改为结果值返回-1 2019-05-28 15:51:59 +08:00
39 changed files with 466 additions and 307 deletions

View File

@@ -119,7 +119,7 @@
threads 线程数, 默认: CPU核数*32 threads 线程数, 默认: CPU核数*32
maxconns最大连接数, 小于1表示无限制 默认: 0 maxconns最大连接数, 小于1表示无限制 默认: 0
maxbody: request.body最大值 默认: 64K maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 默认: 32K; (HTTP 2.0、WebSocket必须要16k以上) bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 1350B
bufferPoolSize ByteBuffer池的大小默认: 线程数*4 bufferPoolSize ByteBuffer池的大小默认: 线程数*4
responsePoolSize Response池的大小默认: 线程数*2 responsePoolSize Response池的大小默认: 线程数*2
aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive

View File

@@ -437,7 +437,7 @@ public abstract class NodeServer {
final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try { try {
if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) { //class没有可用的方法且没有标记启动优先级的 通常为BaseService if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) { //class没有可用的方法且没有标记启动优先级的 通常为BaseService
logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method"); if (!serviceImplClass.getName().startsWith("org.redkale.")) logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method");
return; return;
} }

View File

@@ -52,10 +52,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
//关联的事件数, 小于1表示没有事件 //关联的事件数, 小于1表示没有事件
protected final AtomicInteger eventing = new AtomicInteger(); protected final AtomicInteger eventing = new AtomicInteger();
protected AsyncConnection(Context context) {
this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext());
}
protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) { protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) {
this(bufferPool, bufferPool, sslContext); this(bufferPool, bufferPool, sslContext);
} }
@@ -68,6 +64,14 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
this.sslContext = sslContext; this.sslContext = sslContext;
} }
public Supplier<ByteBuffer> getBufferSupplier() {
return this.bufferSupplier;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return this.bufferConsumer;
}
public final long getLastReadTime() { public final long getLastReadTime() {
return readtime; return readtime;
} }
@@ -114,7 +118,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler); public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
public abstract void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler);
@Override @Override
public abstract int write(ByteBuffer src) throws IOException; public abstract int write(ByteBuffer src) throws IOException;
@@ -245,22 +248,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds); return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
} }
/**
* 创建TCP协议客户端连接
*
* @param context Context
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
* @param writeTimeoutSeconds 写入超时秒数
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final Context context, final AsynchronousChannelGroup group,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds);
}
/** /**
* 创建TCP协议客户端连接 * 创建TCP协议客户端连接
* *
@@ -371,35 +358,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
} }
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) {
return create(context, ch, (SocketAddress) null, 0, 0);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) { public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) {
return create(bufferPool, ch, null, 0, 0); return create(bufferPool, ch, null, 0, 0);
} }

View File

@@ -6,11 +6,8 @@
package org.redkale.net; package org.redkale.net;
import java.net.*; import java.net.*;
import java.nio.*;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.Collection;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
@@ -39,12 +36,6 @@ public class Context {
//ByteBuffer的容量默认8K //ByteBuffer的容量默认8K
protected final int bufferCapacity; protected final int bufferCapacity;
//ByteBuffer对象池
protected final ObjectPool<ByteBuffer> bufferPool;
//Response对象池
protected final ObjectPool<Response> responsePool;
//服务的根Servlet //服务的根Servlet
protected final PrepareServlet prepare; protected final PrepareServlet prepare;
@@ -83,22 +74,18 @@ public class Context {
public Context(ContextConfig config) { public Context(ContextConfig config) {
this(config.serverStartTime, config.logger, config.executor, config.sslContext, this(config.serverStartTime, config.logger, config.executor, config.sslContext,
config.bufferCapacity, config.bufferPool, config.responsePool, config.maxconns, config.maxbody, config.bufferCapacity, config.maxconns, config.maxbody, config.charset, config.address, config.resourceFactory,
config.charset, config.address, config.resourceFactory, config.prepare, config.prepare, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
} }
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext, public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool, final int maxconns, int bufferCapacity, final int maxconns, final int maxbody, Charset charset, InetSocketAddress address,
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, ResourceFactory resourceFactory, PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
final PrepareServlet prepare, final int aliveTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.logger = logger; this.logger = logger;
this.executor = executor; this.executor = executor;
this.sslContext = sslContext; this.sslContext = sslContext;
this.bufferCapacity = bufferCapacity; this.bufferCapacity = bufferCapacity;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.maxconns = maxconns; this.maxconns = maxconns;
this.maxbody = maxbody; this.maxbody = maxbody;
this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset; this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset;
@@ -160,36 +147,6 @@ public class Context {
return bufferCapacity; return bufferCapacity;
} }
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return bufferPool;
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public void offerBuffer(Collection<ByteBuffer> buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public Logger getLogger() { public Logger getLogger() {
return logger; return logger;
} }
@@ -228,12 +185,6 @@ public class Context {
//ByteBuffer的容量默认8K //ByteBuffer的容量默认8K
public int bufferCapacity; public int bufferCapacity;
//ByteBuffer对象池
public ObjectPool<ByteBuffer> bufferPool;
//Response对象池
public ObjectPool<Response> responsePool;
//服务的根Servlet //服务的根Servlet
public PrepareServlet prepare; public PrepareServlet prepare;

View File

@@ -8,7 +8,6 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.nio.*; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -28,12 +27,15 @@ public class PrepareRunner implements Runnable {
private final Context context; private final Context context;
private final ObjectPool<Response> responsePool;
private ByteBuffer data; private ByteBuffer data;
private Response response; private Response response;
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) { public PrepareRunner(Context context, ObjectPool<Response> responsePool, AsyncConnection channel, ByteBuffer data, Response response) {
this.context = context; this.context = context;
this.responsePool = responsePool;
this.channel = channel; this.channel = channel;
this.data = data; this.data = data;
this.response = response; this.response = response;
@@ -41,8 +43,6 @@ public class PrepareRunner implements Runnable {
@Override @Override
public void run() { public void run() {
final boolean keepalive = response != null;
final ObjectPool<? extends Response> responsePool = context.responsePool;
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
if (response == null) response = responsePool.get(); if (response == null) response = responsePool.get();
try { try {
@@ -56,8 +56,7 @@ public class PrepareRunner implements Runnable {
} }
if (response == null) response = responsePool.get(); if (response == null) response = responsePool.get();
try { try {
channel.read(keepalive ? context.getAliveTimeoutSeconds() : context.getReadTimeoutSeconds(), TimeUnit.SECONDS, channel.read(new CompletionHandler<Integer, ByteBuffer>() {
new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer count, ByteBuffer buffer) { public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) { if (count < 1) {
@@ -165,7 +164,7 @@ public class PrepareRunner implements Runnable {
} }
protected Response pollResponse() { protected Response pollResponse() {
return context.responsePool.get(); return responsePool.get();
} }
protected Request pollRequest(Response response) { protected Request pollRequest(Response response) {

View File

@@ -43,7 +43,7 @@ public abstract class ProtocolServer {
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException; public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;
public abstract void accept() throws IOException; public abstract void accept(Server server) throws IOException;
public abstract void close() throws IOException; public abstract void close() throws IOException;

View File

@@ -9,6 +9,7 @@ import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.ObjectPool;
/** /**
* 协议请求对象 * 协议请求对象
@@ -23,6 +24,8 @@ public abstract class Request<C extends Context> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final BsonConvert bsonConvert; protected final BsonConvert bsonConvert;
protected final JsonConvert jsonConvert; protected final JsonConvert jsonConvert;
@@ -47,9 +50,9 @@ public abstract class Request<C extends Context> {
protected final Map<String, Object> attributes = new HashMap<>(); protected final Map<String, Object> attributes = new HashMap<>();
protected Request(C context) { protected Request(C context, ObjectPool<ByteBuffer> bufferPool) {
this.context = context; this.context = context;
this.readBuffer = context.pollBuffer(); this.bufferPool = bufferPool;
this.bsonConvert = context.getBsonConvert(); this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert(); this.jsonConvert = context.getJsonConvert();
} }
@@ -67,7 +70,7 @@ public abstract class Request<C extends Context> {
protected ByteBuffer pollReadBuffer() { protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer; ByteBuffer buffer = this.readBuffer;
this.readBuffer = null; this.readBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
@@ -77,7 +80,7 @@ public abstract class Request<C extends Context> {
buffer.clear(); buffer.clear();
this.readBuffer = buffer; this.readBuffer = buffer;
} else { } else {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
} }

View File

@@ -10,6 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.util.ObjectPool;
/** /**
* 协议响应对象 * 协议响应对象
@@ -26,6 +27,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context; protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool;
protected final R request; protected final R request;
protected AsyncConnection channel; protected AsyncConnection channel;
@@ -66,15 +71,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer attachment) { private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) { if (writeHeadBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) { if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment; writeHeadBuffer = attachment;
} }
} else if (writeBodyBuffer == null) { } else if (writeBodyBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) { if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment; writeBodyBuffer = attachment;
} }
} else { } else {
context.offerBuffer(attachment); bufferPool.accept(attachment);
} }
} }
@@ -108,31 +113,33 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer[] attachments) { private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0; int start = 0;
if (writeHeadBuffer == null && attachments.length > start) { if (writeHeadBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start]; writeHeadBuffer = attachments[start];
start++; start++;
} }
} }
if (writeBodyBuffer == null && attachments.length > start) { if (writeBodyBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) { if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start]; writeBodyBuffer = attachments[start];
start++; start++;
} }
} }
for (int i = start; i < attachments.length; i++) { for (int i = start; i < attachments.length; i++) {
context.offerBuffer(attachments[i]); bufferPool.accept(attachments[i]);
} }
} }
}; };
protected Response(C context, final R request) { protected Response(C context, final R request, ObjectPool<Response> responsePool) {
this.context = context; this.context = context;
this.request = request; this.request = request;
this.writeHeadBuffer = context.pollBuffer(); this.bufferPool = request.bufferPool;
this.writeBodyBuffer = context.pollBuffer(); this.responsePool = responsePool;
this.writeHeadBuffer = bufferPool.get();
this.writeBodyBuffer = bufferPool.get();
this.bodyBufferSupplier = () -> { this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer; ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return context.pollBuffer(); if (buffer == null) return bufferPool.get();
writeBodyBuffer = null; writeBodyBuffer = null;
return buffer; return buffer;
}; };
@@ -141,14 +148,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected ByteBuffer pollWriteReadBuffer() { protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer; ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null; this.writeHeadBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
protected ByteBuffer pollWriteBodyBuffer() { protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer; ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null; this.writeBodyBuffer = null;
if (buffer == null) buffer = context.pollBuffer(); if (buffer == null) buffer = bufferPool.get();
return buffer; return buffer;
} }
@@ -157,7 +164,9 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
protected void offerBuffer(ByteBuffer... buffers) { protected void offerBuffer(ByteBuffer... buffers) {
context.offerBuffer(buffers); for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
} }
protected AsyncConnection removeChannel() { protected AsyncConnection removeChannel() {
@@ -257,19 +266,19 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel(); AsyncConnection conn = removeChannel();
this.recycle(); this.recycle();
this.prepare(); this.prepare();
new PrepareRunner(context, conn, null, this).run(); new PrepareRunner(context, this.responsePool, conn, null, this).run();
} else { } else {
channel.dispose(); channel.dispose();
} }
} else { } else {
this.context.responsePool.accept(this); this.responsePool.accept(this);
} }
} }
public void finish(final byte[] bs) { public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) { if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = this.context.pollBuffer(); ByteBuffer buffer = this.bufferPool.get();
buffer.put(bs); buffer.put(bs);
buffer.flip(); buffer.flip();
this.finish(buffer); this.finish(buffer);
@@ -285,7 +294,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
@@ -296,7 +305,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffer, buffer, finishHandler); conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(ByteBuffer... buffers) { public void finish(ByteBuffer... buffers) {
@@ -306,7 +315,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffers, buffers, finishHandler2); conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
public void finish(boolean kill, ByteBuffer... buffers) { public void finish(boolean kill, ByteBuffer... buffers) {
@@ -317,7 +326,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive; final boolean more = data != null && this.request.keepAlive;
this.request.more = more; this.request.more = more;
conn.write(buffers, buffers, finishHandler2); conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run(); if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
} }
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) { protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
@@ -328,14 +337,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this); channel.write(buffer, attachment, this);
} else { } else {
context.offerBuffer(buffer); bufferPool.accept(buffer);
if (handler != null) handler.completed(result, attachment); if (handler != null) handler.completed(result, attachment);
} }
} }
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }
@@ -353,7 +362,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
index = i; index = i;
break; break;
} }
context.offerBuffer(buffers[i]); bufferPool.accept(buffers[i]);
} }
if (index == 0) { if (index == 0) {
channel.write(buffers, attachment, this); channel.write(buffers, attachment, this);
@@ -367,7 +376,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
if (handler != null) handler.failed(exc, attachment); if (handler != null) handler.failed(exc, attachment);
} }

View File

@@ -7,14 +7,14 @@ package org.redkale.net;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.text.*; import java.text.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.net.Filter;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
@@ -127,8 +127,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0); this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024); this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024);
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024); this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), 32 * 1024); int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(protocol) ? 1350 : 32 * 1024);
this.bufferCapacity = bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity; this.bufferCapacity = "UDP".equalsIgnoreCase(protocol) ? bufCapacity : (bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity);
this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 32); this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 32);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", this.threads * 4); this.bufferPoolSize = config.getIntValue("bufferPoolSize", this.threads * 4);
this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2); this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2);
@@ -281,7 +281,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl")); this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
this.serverChannel.open(config); this.serverChannel.open(config);
serverChannel.bind(address, backlog); serverChannel.bind(address, backlog);
serverChannel.accept(); serverChannel.accept(this);
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
@@ -299,7 +299,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
newServerChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl")); newServerChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
newServerChannel.open(config); newServerChannel.open(config);
newServerChannel.bind(addr, backlog); newServerChannel.bind(addr, backlog);
newServerChannel.accept(); newServerChannel.accept(this);
} catch (IOException e) { } catch (IOException e) {
context.address = oldAddress; context.address = oldAddress;
throw e; throw e;
@@ -358,6 +358,15 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
protected abstract C createContext(); protected abstract C createContext();
//必须在 createContext()之后调用
protected abstract ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize);
//必须在 createContext()之后调用
protected abstract ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize);
//必须在 createResponsePool()之后调用
protected abstract Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool);
public void shutdown() throws IOException { public void shutdown() throws IOException {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning"); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning");

View File

@@ -103,13 +103,6 @@ public class TcpAioAsyncConnection extends AsyncConnection {
} }
} }
@Override
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
this.readtime = System.currentTimeMillis();
ByteBuffer dst = pollReadBuffer();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, dst, handler);
}
private <A> void nextWrite(Throwable exc, A attachment) { private <A> void nextWrite(Throwable exc, A attachment) {
BlockingQueue<WriteEntry> queue = this.writeQueue; BlockingQueue<WriteEntry> queue = this.writeQueue;
if (queue != null && exc != null && !isOpen()) { if (queue != null && exc != null && !isOpen()) {

View File

@@ -7,10 +7,12 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.util.AnyValue; import org.redkale.util.*;
/** /**
* 协议底层Server * 协议底层Server
@@ -70,7 +72,14 @@ public class TcpAioProtocolServer extends ProtocolServer {
} }
@Override @Override
public void accept() throws IOException { public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final AsynchronousServerSocketChannel serchannel = this.serverChannel; final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@@ -93,9 +102,9 @@ public class TcpAioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel, AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
context.runAsync(new PrepareRunner(context, conn, null, null)); context.runAsync(new PrepareRunner(context, responsePool, conn, null, null));
} catch (Throwable e) { } catch (Throwable e) {
context.logger.log(Level.INFO, channel + " accept error", e); context.logger.log(Level.INFO, channel + " accept error", e);
} }

View File

@@ -10,7 +10,6 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*; import java.util.function.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@@ -142,11 +141,6 @@ public class UdpBioAsyncConnection extends AsyncConnection {
} }
} }
@Override
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}
@Override @Override
public int read(ByteBuffer dst) throws IOException { public int read(ByteBuffer dst) throws IOException {
int rs = channel.read(dst); int rs = channel.read(dst);

View File

@@ -11,7 +11,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.redkale.util.AnyValue; import java.util.concurrent.atomic.AtomicLong;
import org.redkale.util.*;
/** /**
* 协议底层Server * 协议底层Server
@@ -70,7 +71,14 @@ public class UdpBioProtocolServer extends ProtocolServer {
} }
@Override @Override
public void accept() throws IOException { public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final DatagramChannel serchannel = this.serverChannel; final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds; final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
@@ -81,15 +89,15 @@ public class UdpBioProtocolServer extends ProtocolServer {
public void run() { public void run() {
cdl.countDown(); cdl.countDown();
while (running) { while (running) {
final ByteBuffer buffer = context.pollBuffer(); final ByteBuffer buffer = bufferPool.get();
try { try {
SocketAddress address = serchannel.receive(buffer); SocketAddress address = serchannel.receive(buffer);
buffer.flip(); buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel, AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel,
context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, buffer, null)); context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null));
} catch (Exception e) { } catch (Exception e) {
context.offerBuffer(buffer); bufferPool.accept(buffer);
} }
} }
} }

View File

@@ -28,6 +28,8 @@ public class HttpContext extends Context {
protected final ConcurrentHashMap<Class, Creator> asyncHandlerCreators = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Class, Creator> asyncHandlerCreators = new ConcurrentHashMap<>();
protected String remoteAddrHeader;
public HttpContext(HttpContextConfig config) { public HttpContext(HttpContextConfig config) {
super(config); super(config);
random.setSeed(Math.abs(System.nanoTime())); random.setSeed(Math.abs(System.nanoTime()));
@@ -43,10 +45,6 @@ public class HttpContext extends Context {
return executor; return executor;
} }
protected ObjectPool<Response> getResponsePool() {
return responsePool;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) { protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass); Creator<H> creator = asyncHandlerCreators.get(handlerClass);
@@ -162,5 +160,6 @@ public class HttpContext extends Context {
public static class HttpContextConfig extends ContextConfig { public static class HttpContextConfig extends ContextConfig {
public String remoteAddrHeader;
} }
} }

View File

@@ -6,6 +6,8 @@
package org.redkale.net.http; package org.redkale.net.http;
import java.io.*; import java.io.*;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Channels; import java.nio.channels.Channels;
@@ -69,15 +71,17 @@ public class HttpRequest extends Request<HttpContext> {
protected int actionid; protected int actionid;
protected Annotation[] annotations;
protected Object currentUser; protected Object currentUser;
private final String remoteAddrHeader; private final String remoteAddrHeader;
Object attachment; //仅供HttpServlet传递Entry使用 Object attachment; //仅供HttpServlet传递Entry使用
public HttpRequest(HttpContext context, String remoteAddrHeader) { public HttpRequest(HttpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context); super(context, bufferPool);
this.remoteAddrHeader = remoteAddrHeader; this.remoteAddrHeader = context.remoteAddrHeader;
} }
protected boolean isWebSocket() { protected boolean isWebSocket() {
@@ -298,6 +302,55 @@ public class HttpRequest extends Request<HttpContext> {
return this.actionid; return this.actionid;
} }
/**
* 获取当前操作Method上的注解集合
*
* @return Annotation[]
*/
public Annotation[] getAnnotations() {
if (this.annotations == null) return new Annotation[0];
Annotation[] newanns = new Annotation[this.annotations.length];
System.arraycopy(this.annotations, 0, newanns, 0, newanns.length);
return newanns;
}
/**
* 获取当前操作Method上的注解
*
* @param <T> 注解泛型
* @param annotationClass 注解类型
*
* @return Annotation
*/
public <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
if (this.annotations == null) return null;
for (Annotation ann : this.annotations) {
if (ann.getClass() == annotationClass) return (T) ann;
}
return null;
}
/**
* 获取当前操作Method上的注解集合
*
* @param <T> 注解泛型
* @param annotationClass 注解类型
*
* @return Annotation[]
*/
public <T extends Annotation> T[] getAnnotationsByType(Class<T> annotationClass) {
if (this.annotations == null) return (T[]) Array.newInstance(annotationClass, 0);
T[] news = (T[]) Array.newInstance(annotationClass, this.annotations.length);
int index = 0;
for (Annotation ann : this.annotations) {
if (ann.getClass() == annotationClass) {
news[index++] = (T) ann;
}
}
if (index < 1) return (T[]) Array.newInstance(annotationClass, 0);
return Arrays.copyOf(news, index);
}
/** /**
* 获取客户端地址IP * 获取客户端地址IP
* *
@@ -443,6 +496,7 @@ public class HttpRequest extends Request<HttpContext> {
this.bodyparsed = false; this.bodyparsed = false;
this.moduleid = 0; this.moduleid = 0;
this.actionid = 0; this.actionid = 0;
this.annotations = null;
this.currentUser = null; this.currentUser = null;
this.attachment = null; this.attachment = null;

View File

@@ -148,8 +148,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
} }
public HttpResponse(HttpContext context, HttpRequest request, HttpResponseConfig config) { public HttpResponse(HttpContext context, HttpRequest request, ObjectPool<Response> responsePool, HttpResponseConfig config) {
super(context, request); super(context, request, responsePool);
this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType; this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType;
this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType; this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType;
this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes(); this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes();
@@ -174,6 +174,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return channel; return channel;
} }
@Override
protected void prepare() {
super.prepare();
}
@Override @Override
protected boolean recycle() { protected boolean recycle() {
this.status = 200; this.status = 200;
@@ -1197,7 +1202,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
context.offerBuffer(attachment); bufferPool.accept(attachment);
finish(true); finish(true);
try { try {
filechannel.close(); filechannel.close();

View File

@@ -37,6 +37,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private byte[] currDateBytes; private byte[] currDateBytes;
private HttpResponseConfig respConfig;
public HttpServer() { public HttpServer() {
this(System.currentTimeMillis(), ResourceFactory.root()); this(System.currentTimeMillis(), ResourceFactory.root());
} }
@@ -304,16 +306,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected HttpContext createContext() { protected HttpContext createContext() {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0; this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0;
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
final List<String[]> defaultAddHeaders = new ArrayList<>(); final List<String[]> defaultAddHeaders = new ArrayList<>();
final List<String[]> defaultSetHeaders = new ArrayList<>(); final List<String[]> defaultSetHeaders = new ArrayList<>();
boolean autoOptions = false; boolean autoOptions = false;
@@ -423,7 +416,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final String addrHeader = remoteAddrHeader; final String addrHeader = remoteAddrHeader;
final HttpResponseConfig respConfig = new HttpResponseConfig(); this.respConfig = new HttpResponseConfig();
respConfig.plainContentType = plainContentType; respConfig.plainContentType = plainContentType;
respConfig.jsonContentType = jsonContentType; respConfig.jsonContentType = jsonContentType;
respConfig.defaultAddHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]); respConfig.defaultAddHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]);
@@ -433,18 +426,12 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
respConfig.dateSupplier = dateSupplier; respConfig.dateSupplier = dateSupplier;
respConfig.renders = ((HttpPrepareServlet) prepare).renders; respConfig.renders = ((HttpPrepareServlet) prepare).renders;
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
final HttpContextConfig contextConfig = new HttpContextConfig(); final HttpContextConfig contextConfig = new HttpContextConfig();
contextConfig.serverStartTime = this.serverStartTime; contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger; contextConfig.logger = this.logger;
contextConfig.executor = this.executor; contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext; contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity; contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.maxconns = this.maxconns; contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody; contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset; contextConfig.charset = this.charset;
@@ -454,9 +441,32 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
contextConfig.aliveTimeoutSeconds = this.aliveTimeoutSeconds; contextConfig.aliveTimeoutSeconds = this.aliveTimeoutSeconds;
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds; contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds; contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
contextConfig.remoteAddrHeader = addrHeader;
HttpContext httpcontext = new HttpContext(contextConfig); return new HttpContext(contextConfig);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), respConfig)); }
return httpcontext;
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context, bufferPool), responsePool, this.respConfig);
} }
} }

View File

@@ -6,6 +6,7 @@
package org.redkale.net.http; package org.redkale.net.http;
import java.io.IOException; import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
@@ -71,6 +72,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
request.attachment = entry; request.attachment = entry;
request.moduleid = entry.moduleid; request.moduleid = entry.moduleid;
request.actionid = entry.actionid; request.actionid = entry.actionid;
request.annotations = entry.annotations;
if (entry.auth) { if (entry.auth) {
response.thenEvent(authSuccessServlet); response.thenEvent(authSuccessServlet);
authenticate(request, response); authenticate(request, response);
@@ -210,6 +212,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
InnerActionEntry(int moduleid, int actionid, String name, String[] methods, Method method, HttpServlet servlet) { InnerActionEntry(int moduleid, int actionid, String name, String[] methods, Method method, HttpServlet servlet) {
this(moduleid, actionid, name, methods, method, auth(method), cacheseconds(method), servlet); this(moduleid, actionid, name, methods, method, auth(method), cacheseconds(method), servlet);
this.annotations = annotations(method);
} }
//供Rest类使用参数不能随便更改 //供Rest类使用参数不能随便更改
@@ -232,16 +235,21 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
} : null; } : null;
} }
private static boolean auth(Method method) { protected static boolean auth(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class); HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null || mapping.auth(); return mapping == null || mapping.auth();
} }
private static int cacheseconds(Method method) { protected static int cacheseconds(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class); HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null ? 0 : mapping.cacheseconds(); return mapping == null ? 0 : mapping.cacheseconds();
} }
//Rest.class会用到此方法
protected static Annotation[] annotations(Method method) {
return method.getAnnotations();
}
boolean isNeedCheck() { boolean isNeedCheck() {
return this.moduleid != 0 || this.actionid != 0; return this.moduleid != 0 || this.actionid != 0;
} }
@@ -270,9 +278,11 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
final String[] methods; final String[] methods;
final Method method;
final HttpServlet servlet; final HttpServlet servlet;
Method method;
Annotation[] annotations;
} }
private HttpServlet createActionServlet(final Method method) { private HttpServlet createActionServlet(final Method method) {

View File

@@ -355,6 +355,10 @@ public final class Rest {
fv.visitEnd(); fv.visitEnd();
} }
} }
{ //_redkale_annotations
fv = cw.visitField(ACC_PUBLIC + ACC_STATIC, "_redkale_annotations", "Ljava/util/Map;", "Ljava/util/Map<Ljava/lang/String;[Ljava/lang/annotation/Annotation;>;", null);
fv.visitEnd();
}
{ //_DynWebSocketServlet构造函数 { //_DynWebSocketServlet构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null)); mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
@@ -422,7 +426,7 @@ public final class Rest {
} }
RestClassLoader newLoader = new RestClassLoader(loader); RestClassLoader newLoader = new RestClassLoader(loader);
Map<String, Annotation[]> msgclassToAnnotations = new HashMap<>();
for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List
Method method = messageMethods.get(i); Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
@@ -504,6 +508,30 @@ public final class Rest {
mv.visitMaxs(2, 2); mv.visitMaxs(2, 2);
mv.visitEnd(); mv.visitEnd();
} }
{ //getAnnotations
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getAnnotations", "()[Ljava/lang/annotation/Annotation;", null, null));
mv.visitFieldInsn(GETSTATIC, newDynName, "_redkale_annotations", "Ljava/util/Map;");
mv.visitLdcInsn(newDynMessageFullName + endfix);
mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get", "(Ljava/lang/Object;)Ljava/lang/Object;", true);
mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;");
mv.visitVarInsn(ASTORE, 1);
mv.visitVarInsn(ALOAD, 1);
Label l2 = new Label();
mv.visitJumpInsn(IFNONNULL, l2);
mv.visitInsn(ICONST_0);
mv.visitTypeInsn(ANEWARRAY, "java/lang/annotation/Annotation");
mv.visitInsn(ARETURN);
mv.visitLabel(l2);
mv.visitFrame(Opcodes.F_APPEND, 1, new Object[]{"[Ljava/lang/annotation/Annotation;"}, 0, null);
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 1);
mv.visitInsn(ARRAYLENGTH);
mv.visitMethodInsn(INVOKESTATIC, "java/util/Arrays", "copyOf", "([Ljava/lang/Object;I)[Ljava/lang/Object;", false);
mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;");
mv.visitInsn(ARETURN);
mv.visitMaxs(2, 2);
mv.visitEnd();
}
{ //execute { //execute
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(L" + newDynWebSokcetFullName + ";)V", null, null)); mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(L" + newDynWebSokcetFullName + ";)V", null, null));
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
@@ -544,6 +572,7 @@ public final class Rest {
} }
cw2.visitEnd(); cw2.visitEnd();
newLoader.loadClass((newDynMessageFullName + endfix).replace('/', '.'), cw2.toByteArray()); newLoader.loadClass((newDynMessageFullName + endfix).replace('/', '.'), cw2.toByteArray());
msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations());
} }
{ //_DynXXXWebSocketMessage class { //_DynXXXWebSocketMessage class
@@ -676,6 +705,7 @@ public final class Rest {
Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray()); Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try { try {
T servlet = (T) newClazz.getDeclaredConstructor().newInstance(); T servlet = (T) newClazz.getDeclaredConstructor().newInstance();
newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations);
if (rws.cryptor() != Cryptor.class) { if (rws.cryptor() != Cryptor.class) {
Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance(); Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance();
Field cryptorField = newClazz.getSuperclass().getDeclaredField("cryptor"); //WebSocketServlet Field cryptorField = newClazz.getSuperclass().getDeclaredField("cryptor"); //WebSocketServlet
@@ -1810,9 +1840,10 @@ public final class Rest {
// HashMap<String, InnerActionEntry> _createRestInnerActionEntry() { // HashMap<String, InnerActionEntry> _createRestInnerActionEntry() {
// HashMap<String, InnerActionEntry> map = new HashMap<>(); // HashMap<String, InnerActionEntry> map = new HashMap<>();
// map.put("asyncfind3", new InnerActionEntry(100000,200000,"asyncfind3", new String[]{},null,false,0, new _Dync_asyncfind3_HttpServlet())); // map.put("asyncfind3", new InnerActionEntry(100000,200000,"asyncfind3", new String[]{},null,false,0, new _Dync_asyncfind3_HttpServlet()));
// map.put("asyncfind3", new InnerActionEntry(1,2,"asyncfind2", new String[]{"GET", "POST"},null,true,0, new _Dync_asyncfind2_HttpServlet())); // map.put("asyncfind2", new InnerActionEntry(1,2,"asyncfind2", new String[]{"GET", "POST"},null,true,0, new _Dync_asyncfind2_HttpServlet()));
// return map; // return map;
// } // }
Map<String, Method> mappingurlToMethod = new HashMap<>();
{ //_createRestInnerActionEntry 方法 { //_createRestInnerActionEntry 方法
mv = new MethodDebugVisitor(cw.visitMethod(0, "_createRestInnerActionEntry", "()Ljava/util/HashMap;", "()Ljava/util/HashMap<Ljava/lang/String;L" + innerEntryName + ";>;", null)); mv = new MethodDebugVisitor(cw.visitMethod(0, "_createRestInnerActionEntry", "()Ljava/util/HashMap;", "()Ljava/util/HashMap<Ljava/lang/String;L" + innerEntryName + ";>;", null));
//mv.setDebug(true); //mv.setDebug(true);
@@ -1822,6 +1853,7 @@ public final class Rest {
mv.visitVarInsn(ASTORE, 1); mv.visitVarInsn(ASTORE, 1);
for (final MappingEntry entry : entrys) { for (final MappingEntry entry : entrys) {
mappingurlToMethod.put(entry.mappingurl, entry.mappingMethod);
mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 1);
mv.visitLdcInsn(entry.mappingurl); //name mv.visitLdcInsn(entry.mappingurl); //name
mv.visitTypeInsn(NEW, innerEntryName); //new InnerActionEntry mv.visitTypeInsn(NEW, innerEntryName); //new InnerActionEntry
@@ -1837,9 +1869,9 @@ public final class Rest {
mv.visitLdcInsn(entry.methods[i]); mv.visitLdcInsn(entry.methods[i]);
mv.visitInsn(AASTORE); mv.visitInsn(AASTORE);
} }
mv.visitInsn(ACONST_NULL); //method mv.visitInsn(ACONST_NULL); //method
mv.visitInsn(entry.auth ? ICONST_1 : ICONST_0); //auth mv.visitInsn(entry.auth ? ICONST_1 : ICONST_0); //auth
pushInt(mv, entry.cacheseconds); //cacheseconds pushInt(mv, entry.cacheseconds); //cacheseconds
mv.visitTypeInsn(NEW, newDynName + "$" + entry.newActionClassName); mv.visitTypeInsn(NEW, newDynName + "$" + entry.newActionClassName);
mv.visitInsn(DUP); mv.visitInsn(DUP);
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
@@ -1920,7 +1952,12 @@ public final class Rest {
restactMethod.setAccessible(true); restactMethod.setAccessible(true);
Field tmpentrysfield = HttpServlet.class.getDeclaredField("_tmpentrys"); Field tmpentrysfield = HttpServlet.class.getDeclaredField("_tmpentrys");
tmpentrysfield.setAccessible(true); tmpentrysfield.setAccessible(true);
tmpentrysfield.set(obj, restactMethod.invoke(obj)); HashMap<String, HttpServlet.InnerActionEntry> innerEntryMap = (HashMap) restactMethod.invoke(obj);
for (Map.Entry<String, HttpServlet.InnerActionEntry> en : innerEntryMap.entrySet()) {
Method m = mappingurlToMethod.get(en.getKey());
if (m != null) en.getValue().annotations = HttpServlet.InnerActionEntry.annotations(m);
}
tmpentrysfield.set(obj, innerEntryMap);
return obj; return obj;
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -11,10 +11,11 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection;
import org.redkale.util.Comment; import org.redkale.util.Comment;
/** /**
@@ -82,6 +83,8 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketEngine _engine; //不可能为空 WebSocketEngine _engine; //不可能为空
AsyncConnection _channel;//不可能为空
String _sessionid; //不可能为空 String _sessionid; //不可能为空
G _userid; //不可能为空 G _userid; //不可能为空
@@ -674,12 +677,21 @@ public abstract class WebSocket<G extends Serializable, T> {
} }
/** /**
* 获取ByteBuffer资源池 * 获取ByteBuffer生成器
* *
* @return Supplier * @return Supplier
*/ */
protected Supplier<ByteBuffer> getByteBufferSupplier() { protected Supplier<ByteBuffer> getBufferSupplier() {
return this._runner.context.getBufferSupplier(); return this._channel.getBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getBufferConsumer() {
return this._channel.getBufferConsumer();
} }
//------------------------------------------------------------------- //-------------------------------------------------------------------

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL; import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -229,26 +230,45 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) { if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers //此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {
for (WebSocket websocket : websockets.values()) { for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue; if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} else { } else {
for (List<WebSocket> list : websockets2.values()) { for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) { for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue; if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} }
} }
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
@@ -286,16 +306,23 @@ public class WebSocketEngine {
} }
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) { if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers //此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[]) : ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
if (single) { if (single) {
for (Serializable userid : userids) { for (Serializable userid : userids) {
WebSocket websocket = websockets.get(userid); WebSocket websocket = websockets.get(userid);
if (websocket == null) continue; if (websocket == null) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} else { } else {
@@ -303,11 +330,23 @@ public class WebSocketEngine {
List<WebSocket> list = websockets2.get(userid); List<WebSocket> list = websockets2.get(userid);
if (list == null) continue; if (list == null) continue;
for (WebSocket websocket : list) { for (WebSocket websocket : list) {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} }
} }
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else { } else {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;

View File

@@ -428,7 +428,7 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]); keyuser.put(keys[i], userids[i]);
} }
tryAcquireSemaphore(); tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys); CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> { rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) { if (addrs == null || addrs.isEmpty()) {
@@ -692,7 +692,7 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]); keyuser.put(keys[i], userids[i]);
} }
tryAcquireSemaphore(); tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys); CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> { rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) { if (addrs == null || addrs.isEmpty()) {

View File

@@ -492,7 +492,7 @@ public final class WebSocketPacket {
void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) { void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) {
if (webSocket._engine.cryptor != null) { if (webSocket._engine.cryptor != null) {
HttpContext context = webSocket._engine.context; HttpContext context = webSocket._engine.context;
buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer()); buffers = webSocket._engine.cryptor.decrypt(buffers, webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer());
} }
FrameType selfType = this.type; FrameType selfType = this.type;
final boolean series = selfType == FrameType.SERIES; final boolean series = selfType == FrameType.SERIES;

View File

@@ -5,6 +5,10 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.util.Arrays;
/** /**
* *
* 供WebSocket.preOnMessage 方法获取RestWebSocket里OnMessage方法的参数 <br> * 供WebSocket.preOnMessage 方法获取RestWebSocket里OnMessage方法的参数 <br>
@@ -16,6 +20,29 @@ package org.redkale.net.http;
public interface WebSocketParam { public interface WebSocketParam {
public <T> T getValue(String name); public <T> T getValue(String name);
public String[] getNames(); public String[] getNames();
public Annotation[] getAnnotations();
default <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
for (Annotation ann : getAnnotations()) {
if (ann.getClass() == annotationClass) return (T) ann;
}
return null;
}
default <T extends Annotation> T[] getAnnotationsByType(Class<T> annotationClass) {
Annotation[] annotations = getAnnotations();
if (annotations == null) return (T[]) Array.newInstance(annotationClass, 0);
T[] news = (T[]) Array.newInstance(annotationClass, annotations.length);
int index = 0;
for (Annotation ann : annotations) {
if (ann.getClass() == annotationClass) {
news[index++] = (T) ann;
}
}
if (index < 1) return (T[]) Array.newInstance(annotationClass, 0);
return Arrays.copyOf(news, index);
}
} }

View File

@@ -5,7 +5,6 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import org.redkale.net.AsyncConnection;
import static org.redkale.net.http.WebSocket.*; import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.http.WebSocketPacket.FrameType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@@ -29,8 +28,6 @@ class WebSocketRunner implements Runnable {
private final WebSocketEngine engine; private final WebSocketEngine engine;
private final AsyncConnection channel;
private final WebSocket webSocket; private final WebSocket webSocket;
protected final HttpContext context; protected final HttpContext context;
@@ -49,13 +46,12 @@ class WebSocketRunner implements Runnable {
protected long lastReadTime; protected long lastReadTime;
WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel) { WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context; this.context = context;
this.engine = webSocket._engine; this.engine = webSocket._engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.mergemsg = webSocket._engine.mergemsg; this.mergemsg = webSocket._engine.mergemsg;
this.restMessageConsumer = messageConsumer; this.restMessageConsumer = messageConsumer;
this.channel = channel;
} }
@Override @Override
@@ -64,10 +60,10 @@ class WebSocketRunner implements Runnable {
final WebSocketRunner self = this; final WebSocketRunner self = this;
try { try {
webSocket.onConnected(); webSocket.onConnected();
channel.setReadTimeoutSeconds(300); //读取超时5分钟 webSocket._channel.setReadTimeoutSeconds(300); //读取超时5分钟
if (channel.isOpen()) { if (webSocket._channel.isOpen()) {
final int wsmaxbody = webSocket._engine.wsmaxbody; final int wsmaxbody = webSocket._engine.wsmaxbody;
channel.read(new CompletionHandler<Integer, ByteBuffer>() { webSocket._channel.read(new CompletionHandler<Integer, ByteBuffer>() {
//尚未解析完的数据包 //尚未解析完的数据包
private WebSocketPacket unfinishPacket; private WebSocketPacket unfinishPacket;
@@ -94,11 +90,11 @@ class WebSocketRunner implements Runnable {
onePacket = unfinishPacket; onePacket = unfinishPacket;
unfinishPacket = null; unfinishPacket = null;
for (ByteBuffer b : exBuffers) { for (ByteBuffer b : exBuffers) {
context.offerBuffer(b); webSocket._channel.offerBuffer(b);
} }
exBuffers.clear(); exBuffers.clear();
} else { //需要继续接收, 此处不能回收readBuffer } else { //需要继续接收, 此处不能回收readBuffer
channel.read(this); webSocket._channel.read(this);
return; return;
} }
} }
@@ -125,7 +121,7 @@ class WebSocketRunner implements Runnable {
} }
//继续监听消息 //继续监听消息
if (readBuffer.hasRemaining()) { //exBuffers缓存了 if (readBuffer.hasRemaining()) { //exBuffers缓存了
readBuffer = context.pollBuffer(); readBuffer = webSocket._channel.pollReadBuffer();
} else { } else {
readBuffer.clear(); readBuffer.clear();
} }
@@ -133,8 +129,8 @@ class WebSocketRunner implements Runnable {
readBuffer.put(halfBytes.getValue()); readBuffer.put(halfBytes.getValue());
halfBytes.setValue(null); halfBytes.setValue(null);
} }
channel.setReadBuffer(readBuffer); webSocket._channel.setReadBuffer(readBuffer);
channel.read(this); webSocket._channel.read(this);
//消息处理 //消息处理
for (final WebSocketPacket packet : packets) { for (final WebSocketPacket packet : packets) {
@@ -229,11 +225,11 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息"); //System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>(); final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try { try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor); ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
this.lastSendTime = System.currentTimeMillis(); this.lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() { webSocket._channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
private CompletableFuture<Integer> future = futureResult; private CompletableFuture<Integer> future = futureResult;
@@ -245,7 +241,7 @@ class WebSocketRunner implements Runnable {
future = null; future = null;
if (attachments != null) { if (attachments != null) {
for (ByteBuffer buf : attachments) { for (ByteBuffer buf : attachments) {
context.offerBuffer(buf); webSocket._channel.offerBuffer(buf);
} }
} }
} }
@@ -260,7 +256,7 @@ class WebSocketRunner implements Runnable {
} }
} }
if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法 if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法
channel.write(attachments, index, attachments.length - index, attachments, this); webSocket._channel.write(attachments, index, attachments.length - index, attachments, this);
return; return;
} }
if (future != null) { if (future != null) {
@@ -268,7 +264,7 @@ class WebSocketRunner implements Runnable {
future = null; future = null;
if (attachments != null) { if (attachments != null) {
for (ByteBuffer buf : attachments) { for (ByteBuffer buf : attachments) {
context.offerBuffer(buf); webSocket._channel.offerBuffer(buf);
} }
} }
} }
@@ -310,7 +306,7 @@ class WebSocketRunner implements Runnable {
if (closed) return null; if (closed) return null;
closed = true; closed = true;
CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket); CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket);
channel.dispose(); webSocket._channel.dispose();
webSocket.onClose(code, reason); webSocket.onClose(code, reason);
return future; return future;
} }

View File

@@ -202,6 +202,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine; webSocket._engine = this.node.localEngine;
webSocket._channel = response.getChannel();
webSocket._messageTextType = this.messageTextType; webSocket._messageTextType = this.messageTextType;
webSocket._textConvert = textConvert; webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert; webSocket._binaryConvert = binaryConvert;
@@ -262,7 +263,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Consumer<Boolean> task = (oldkilled) -> { Consumer<Boolean> task = (oldkilled) -> {
if (oldkilled) { if (oldkilled) {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
@@ -283,7 +285,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
@@ -291,14 +294,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
} }
}; };
if (webSocket.delayPackets != null) { //存在待发送的消息 if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets; List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null; webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null; CompletableFuture<Integer> cf = null;
@@ -323,7 +327,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
}; };
if (webSocket.delayPackets != null) { //存在待发送的消息 if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets; List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null; webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null; CompletableFuture<Integer> cf = null;

View File

@@ -112,7 +112,7 @@ public final class SncpDynServlet extends SncpServlet {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException { public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = request.getContext().getBufferSupplier(); bufferSupplier = request.getBufferPool();
} }
final SncpServletAction action = actions.get(request.getActionid()); final SncpServletAction action = actions.get(request.getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); //logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));

View File

@@ -45,11 +45,15 @@ public final class SncpRequest extends Request<SncpContext> {
private byte[] bufferbytes = new byte[6]; private byte[] bufferbytes = new byte[6];
protected SncpRequest(SncpContext context) { protected SncpRequest(SncpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context); super(context, bufferPool);
this.convert = context.getBsonConvert(); this.convert = context.getBsonConvert();
} }
protected ObjectPool<ByteBuffer> getBufferPool() {
return this.bufferPool;
}
@Override @Override
protected int readHeader(ByteBuffer buffer) { protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < HEADER_SIZE) { if (buffer.remaining() < HEADER_SIZE) {

View File

@@ -45,8 +45,8 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
return null; return null;
} }
protected SncpResponse(SncpContext context, SncpRequest request) { protected SncpResponse(SncpContext context, SncpRequest request, ObjectPool<Response> responsePool) {
super(context, request); super(context, request, responsePool);
this.addrBytes = context.getServerAddress().getAddress().getAddress(); this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort(); this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4"); if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");
@@ -56,7 +56,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
protected void offerBuffer(ByteBuffer... buffers) { protected void offerBuffer(ByteBuffer... buffers) {
super.offerBuffer(buffers); super.offerBuffer(buffers);
} }
public void finish(final int retcode, final BsonWriter out) { public void finish(final int retcode, final BsonWriter out) {
if (out == null) { if (out == null) {
final ByteBuffer buffer = pollWriteReadBuffer(); final ByteBuffer buffer = pollWriteReadBuffer();

View File

@@ -99,28 +99,14 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected SncpContext createContext() { protected SncpContext createContext() {
final int port = this.address.getPort(); this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024);
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = Math.max(this.bufferCapacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
final SncpContextConfig contextConfig = new SncpContextConfig(); final SncpContextConfig contextConfig = new SncpContextConfig();
contextConfig.serverStartTime = this.serverStartTime; contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger; contextConfig.logger = this.logger;
contextConfig.executor = this.executor; contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext; contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity; contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.maxconns = this.maxconns; contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody; contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset; contextConfig.charset = this.charset;
@@ -131,9 +117,31 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds; contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds; contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
SncpContext sncpcontext = new SncpContext(contextConfig); return new SncpContext(contextConfig);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext))); }
return sncpcontext;
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context, bufferPool), responsePool);
} }
} }

View File

@@ -677,7 +677,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public <T> Map<String, Collection<T>> getCollectionMap(final Type componentType, final String... keys) { public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys) {
Map<String, Collection<T>> map = new HashMap<>(); Map<String, Collection<T>> map = new HashMap<>();
for (String key : keys) { for (String key : keys) {
Collection<T> s = (Collection<T>) get(key); Collection<T> s = (Collection<T>) get(key);
@@ -692,7 +692,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public Map<String, Collection<String>> getStringCollectionMap(final String... keys) { public Map<String, Collection<String>> getStringCollectionMap(final boolean set, final String... keys) {
Map<String, Collection<String>> map = new HashMap<>(); Map<String, Collection<String>> map = new HashMap<>();
for (String key : keys) { for (String key : keys) {
Collection<String> s = (Collection<String>) get(key); Collection<String> s = (Collection<String>) get(key);
@@ -707,7 +707,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public Map<String, Collection<Long>> getLongCollectionMap(final String... keys) { public Map<String, Collection<Long>> getLongCollectionMap(final boolean set, final String... keys) {
Map<String, Collection<Long>> map = new HashMap<>(); Map<String, Collection<Long>> map = new HashMap<>();
for (String key : keys) { for (String key : keys) {
Collection<Long> s = (Collection<Long>) get(key); Collection<Long> s = (Collection<Long>) get(key);
@@ -727,8 +727,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public CompletableFuture<Map<String, Collection<V>>> getCollectionMapAsync(final Type componentType, final String... keys) { public CompletableFuture<Map<String, Collection<V>>> getCollectionMapAsync(final boolean set, final Type componentType, final String... keys) {
return CompletableFuture.supplyAsync(() -> getCollectionMap(componentType, keys), getExecutor()); return CompletableFuture.supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor());
} }
@Override @Override
@@ -737,8 +737,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final String... keys) { public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final boolean set, final String... keys) {
return CompletableFuture.supplyAsync(() -> getStringCollectionMap(keys), getExecutor()); return CompletableFuture.supplyAsync(() -> getStringCollectionMap(set, keys), getExecutor());
} }
@Override @Override
@@ -747,8 +747,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final String... keys) { public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final boolean set, final String... keys) {
return CompletableFuture.supplyAsync(() -> getLongCollectionMap(keys), getExecutor()); return CompletableFuture.supplyAsync(() -> getLongCollectionMap(set, keys), getExecutor());
} }
@Override @Override

View File

@@ -92,7 +92,7 @@ public interface CacheSource<V extends Object> {
public <T> Collection<T> getCollection(final String key, final Type componentType); public <T> Collection<T> getCollection(final String key, final Type componentType);
public <T> Map<String, Collection<T>> getCollectionMap(final Type componentType, final String... keys); public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys);
public int getCollectionSize(final String key); public int getCollectionSize(final String key);
@@ -140,7 +140,7 @@ public interface CacheSource<V extends Object> {
public Collection<String> getStringCollection(final String key); public Collection<String> getStringCollection(final String key);
public Map<String, Collection<String>> getStringCollectionMap(final String... keys); public Map<String, Collection<String>> getStringCollectionMap(final boolean set, final String... keys);
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds); public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds);
@@ -164,7 +164,7 @@ public interface CacheSource<V extends Object> {
public Collection<Long> getLongCollection(final String key); public Collection<Long> getLongCollection(final String key);
public Map<String, Collection<Long>> getLongCollectionMap(final String... keys); public Map<String, Collection<Long>> getLongCollectionMap(final boolean set, final String... keys);
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds); public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds);
@@ -241,7 +241,7 @@ public interface CacheSource<V extends Object> {
public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType); public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType);
public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(final Type componentType, final String... keys); public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(final boolean set, final Type componentType, final String... keys);
public CompletableFuture<Integer> getCollectionSizeAsync(final String key); public CompletableFuture<Integer> getCollectionSizeAsync(final String key);
@@ -289,7 +289,7 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key); public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key);
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final String... keys); public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final boolean set, final String... keys);
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds); public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds);
@@ -313,7 +313,7 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Collection<Long>> getLongCollectionAsync(final String key); public CompletableFuture<Collection<Long>> getLongCollectionAsync(final String key);
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final String... keys); public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final boolean set, final String... keys);
public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds); public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds);

View File

@@ -177,11 +177,13 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
protected <T> int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException { protected <T> int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0; int i = 0;
for (Attribute<T, Serializable> attr : attrs) { for (Attribute<T, Serializable> attr : attrs) {
Serializable val = info.getSQLValue(attr, entity); Object val = info.getSQLValue(attr, entity);
if (val instanceof byte[]) { if (val instanceof byte[]) {
Blob blob = conn.createBlob(); Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val); blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob); prestmt.setObject(++i, blob);
} else if (val instanceof Boolean) {
prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0);
} else if (val instanceof AtomicInteger) { } else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get()); prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) { } else if (val instanceof AtomicLong) {
@@ -230,6 +232,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close(); stmt.close();
return CompletableFuture.completedFuture(c); return CompletableFuture.completedFuture(c);
} catch (SQLException e) { } catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;
@@ -250,6 +253,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close(); stmt.close();
return CompletableFuture.completedFuture(c); return CompletableFuture.completedFuture(c);
} catch (SQLException e) { } catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;

View File

@@ -216,7 +216,7 @@ public interface DataSource {
* @param <T> Entity泛型 * @param <T> Entity泛型
* @param clazz Entity类 * @param clazz Entity类
* *
* @return 影响的记录条数 * @return 影响的记录条数 -1表示表不存在
*/ */
public <T> int clearTable(final Class<T> clazz); public <T> int clearTable(final Class<T> clazz);
@@ -227,7 +227,7 @@ public interface DataSource {
* @param <T> Entity泛型 * @param <T> Entity泛型
* @param clazz Entity类 * @param clazz Entity类
* *
* @return 影响的记录条数CompletableFuture * @return 影响的记录条数CompletableFuture -1表示表不存在
*/ */
public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz); public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz);
@@ -239,7 +239,7 @@ public interface DataSource {
* @param clazz Entity类 * @param clazz Entity类
* @param node 过滤条件 * @param node 过滤条件
* *
* @return 影响的记录条数 * @return 影响的记录条数 -1表示表不存在
*/ */
public <T> int clearTable(final Class<T> clazz, final FilterNode node); public <T> int clearTable(final Class<T> clazz, final FilterNode node);
@@ -251,7 +251,7 @@ public interface DataSource {
* @param clazz Entity类 * @param clazz Entity类
* @param node 过滤条件 * @param node 过滤条件
* *
* @return 影响的记录条数CompletableFuture * @return 影响的记录条数CompletableFuture -1表示表不存在
*/ */
public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz, final FilterNode node); public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz, final FilterNode node);
@@ -263,7 +263,7 @@ public interface DataSource {
* @param <T> Entity泛型 * @param <T> Entity泛型
* @param clazz Entity类 * @param clazz Entity类
* *
* @return 影响的记录条数 * @return 影响的记录条数 -1表示表不存在
*/ */
public <T> int dropTable(final Class<T> clazz); public <T> int dropTable(final Class<T> clazz);
@@ -274,7 +274,7 @@ public interface DataSource {
* @param <T> Entity泛型 * @param <T> Entity泛型
* @param clazz Entity类 * @param clazz Entity类
* *
* @return 影响的记录条数CompletableFuture * @return 影响的记录条数CompletableFuture -1表示表不存在
*/ */
public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz); public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz);
@@ -286,7 +286,7 @@ public interface DataSource {
* @param clazz Entity类 * @param clazz Entity类
* @param node 过滤条件 * @param node 过滤条件
* *
* @return 影响的记录条数 * @return 影响的记录条数 -1表示表不存在
*/ */
public <T> int dropTable(final Class<T> clazz, final FilterNode node); public <T> int dropTable(final Class<T> clazz, final FilterNode node);
@@ -298,7 +298,7 @@ public interface DataSource {
* @param clazz Entity类 * @param clazz Entity类
* @param node 过滤条件 * @param node 过滤条件
* *
* @return 影响的记录条数CompletableFuture * @return 影响的记录条数CompletableFuture -1表示表不存在
*/ */
public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz, final FilterNode node); public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz, final FilterNode node);

View File

@@ -1177,7 +1177,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (blobs == null) return updateDB(info, null, sql, false); if (blobs == null) return updateDB(info, null, sql, false);
return updateDB(info, null, sql, true, blobs.toArray()); return updateDB(info, null, sql, true, blobs.toArray());
} else { } else {
final Serializable id = info.getSQLValue(info.getPrimary(), entity); final Serializable id = (Serializable) info.getSQLValue(info.getPrimary(), entity);
String sql = "UPDATE " + info.getTable(id) + " a SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id); String sql = "UPDATE " + info.getTable(id) + " a SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
if (blobs == null) return updateDB(info, null, sql, false); if (blobs == null) return updateDB(info, null, sql, false);
return updateDB(info, null, sql, true, blobs.toArray()); return updateDB(info, null, sql, true, blobs.toArray());

View File

@@ -902,10 +902,10 @@ public final class EntityInfo<T> {
* *
* @return Object * @return Object
*/ */
public Serializable getSQLValue(Attribute<T, Serializable> attr, T entity) { public <F> Object getSQLValue(Attribute<T, F> attr, T entity) {
Serializable val = attr.get(entity); Object val = attr.get(entity);
CryptHandler cryptHandler = attr.attach(); CryptHandler cryptHandler = attr.attach();
if (cryptHandler != null) val = (Serializable) cryptHandler.encrypt(val); if (cryptHandler != null) val = cryptHandler.encrypt(val);
return val; return val;
} }

View File

@@ -86,7 +86,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Byte t) { public boolean test(Byte t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }
@@ -131,7 +131,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Short t) { public boolean test(Short t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }
@@ -175,7 +175,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Integer t) { public boolean test(Integer t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }
@@ -229,7 +229,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Long t) { public boolean test(Long t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }
@@ -273,7 +273,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Float t) { public boolean test(Float t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }
@@ -317,7 +317,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override @Override
public boolean test(Double t) { public boolean test(Double t) {
if (max < min) return t >= min; if (max < min && max <= 0) return t >= min;
return t >= min && t <= max; return t >= min && t <= max;
} }

View File

@@ -1980,14 +1980,14 @@ public final class Utility {
} }
conn.setRequestMethod(method); conn.setRequestMethod(method);
if (headers != null) { if (headers != null) {
for (Map.Entry<String, String> en : headers.entrySet()) { //不用forEach是为了兼容JDK 6 for (Map.Entry<String, String> en : headers.entrySet()) {
conn.setRequestProperty(en.getKey(), en.getValue()); conn.setRequestProperty(en.getKey(), en.getValue());
} }
} }
if (body != null && !body.isEmpty()) { //conn.getOutputStream()会将GET强制变成POST if (body != null && !body.isEmpty()) { //conn.getOutputStream()会将GET强制变成POST
conn.setDoInput(true); conn.setDoInput(true);
conn.setDoOutput(true); conn.setDoOutput(true);
conn.getOutputStream().write(body == null ? new byte[0] : body.getBytes(UTF_8)); conn.getOutputStream().write(body.getBytes(UTF_8));
} }
conn.connect(); conn.connect();
int rs = conn.getResponseCode(); int rs = conn.getResponseCode();

View File

@@ -6,7 +6,8 @@
package org.redkale.test.wsdync; package org.redkale.test.wsdync;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.lang.annotation.Annotation;
import java.util.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.convert.ConvertDisabled; import org.redkale.convert.ConvertDisabled;
@@ -26,6 +27,8 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
@Resource @Resource
private ChatService _redkale_resource_0; private ChatService _redkale_resource_0;
public static Map<String, Annotation[]> _redkale_annotations;
public _DyncChatWebSocketServlet() { public _DyncChatWebSocketServlet() {
super(); super();
this.messageTextType = _DyncChatWebSocketMessage.class; this.messageTextType = _DyncChatWebSocketMessage.class;
@@ -82,6 +85,13 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
return null; return null;
} }
@Override
public Annotation[] getAnnotations() {
Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage_sendmessagee_00");
if (annotations == null) return new Annotation[0];
return Arrays.copyOf(annotations, annotations.length);
}
public void execute(_DyncChatWebSocket websocket) { public void execute(_DyncChatWebSocket websocket) {
this._redkale_websocket = websocket; this._redkale_websocket = websocket;
websocket.preOnMessage("sendmessage", this, this); websocket.preOnMessage("sendmessage", this, this);
@@ -116,6 +126,13 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
return null; return null;
} }
@Override
public Annotation[] getAnnotations() {
Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage_joinroom_01");
if (annotations == null) return new Annotation[0];
return Arrays.copyOf(annotations, annotations.length);
}
public void execute(_DyncChatWebSocket websocket) { public void execute(_DyncChatWebSocket websocket) {
this._redkale_websocket = websocket; this._redkale_websocket = websocket;
websocket.preOnMessage("joinroom", this, this); websocket.preOnMessage("joinroom", this, this);