18 Commits

Author SHA1 Message Date
Redkale
7a5fbcdccd 2019-07-03 11:35:37 +08:00
Redkale
345e929712 [不兼容修改]CacheSource的getCollectionMap序列方法增加一个set参数 2019-06-27 18:21:19 +08:00
Redkale
358862fe59 修复Entity类带boolean字段调DataSource.insert出现异常的bug 2019-06-26 16:51:48 +08:00
Redkale
3dde9bb293 2019-06-21 17:00:40 +08:00
Redkale
99ae4ccadd 从Context中移除BufferPool和ResponsePool 2019-06-20 15:26:20 +08:00
Redkale
98e9ffe0ef 2019-06-20 10:02:24 +08:00
Redkale
6927bfe8ac 2019-06-19 16:52:00 +08:00
Redkale
340a3a8fa3 2019-06-19 16:46:12 +08:00
Redkale
4724763991 2019-06-19 16:45:45 +08:00
Redkale
03353ad08c 2019-06-19 16:21:05 +08:00
Redkale
95c3354fcd WebSocketParam增加getAnnotations系列方法 2019-06-19 15:47:46 +08:00
Redkale
1bda2f92b9 HttpRequest增加getAnnotation系列方法 2019-06-18 22:59:07 +08:00
Redkale
bd3c706934 修复DataSource中json字段不为Serializable时会异常的bug 2019-06-13 22:34:53 +08:00
Redkale
ef3663aa36 修复DataSource中json字段不为Serializable时会异常的bug 2019-06-13 22:23:30 +08:00
Redkale
427ff717d4 UDP协议下bufferCapacity默认值为1350字节 2019-05-30 12:07:09 +08:00
Redkale
b409300412 UDP协议下bufferCapacity默认值为1480字节 2019-05-30 12:04:13 +08:00
Redkale
ca1f974dbe 2019-05-28 17:43:48 +08:00
Redkale
6a8c86096b DataSource的clearTable、dropTable在表不存在的情况下由抛异常改为结果值返回-1 2019-05-28 15:51:59 +08:00
39 changed files with 466 additions and 307 deletions

View File

@@ -119,7 +119,7 @@
threads 线程数, 默认: CPU核数*32
maxconns最大连接数, 小于1表示无限制 默认: 0
maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 默认: 32K; (HTTP 2.0、WebSocket必须要16k以上)
bufferCapacity: ByteBuffer的初始化大小 TCP默认: 32K; (HTTP 2.0、WebSocket必须要16k以上); UDP默认: 1350B
bufferPoolSize ByteBuffer池的大小默认: 线程数*4
responsePoolSize Response池的大小默认: 线程数*2
aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive

View File

@@ -437,7 +437,7 @@ public abstract class NodeServer {
final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) { //class没有可用的方法且没有标记启动优先级的 通常为BaseService
logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method");
if (!serviceImplClass.getName().startsWith("org.redkale.")) logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method");
return;
}

View File

@@ -52,10 +52,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
//关联的事件数, 小于1表示没有事件
protected final AtomicInteger eventing = new AtomicInteger();
protected AsyncConnection(Context context) {
this(context.getBufferSupplier(), context.getBufferConsumer(), context.getSSLContext());
}
protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) {
this(bufferPool, bufferPool, sslContext);
}
@@ -68,6 +64,14 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
this.sslContext = sslContext;
}
public Supplier<ByteBuffer> getBufferSupplier() {
return this.bufferSupplier;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return this.bufferConsumer;
}
public final long getLastReadTime() {
return readtime;
}
@@ -114,7 +118,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
public abstract void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler);
@Override
public abstract int write(ByteBuffer src) throws IOException;
@@ -245,22 +248,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return createTCP(bufferPool, group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 创建TCP协议客户端连接
*
* @param context Context
* @param address 连接点子
* @param group 连接AsynchronousChannelGroup
* @param readTimeoutSeconds 读取超时秒数
* @param writeTimeoutSeconds 写入超时秒数
*
* @return 连接CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createTCP(final Context context, final AsynchronousChannelGroup group,
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return createTCP(context.getBufferSupplier(), context.getBufferConsumer(), group, context.getSSLContext(), address, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 创建TCP协议客户端连接
*
@@ -371,35 +358,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return new UdpBioAsyncConnection(bufferPool, bufferPool, ch, sslContext, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch) {
return create(context, ch, (SocketAddress) null, 0, 0);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final Context context, final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
}
public static AsyncConnection create(final ObjectPool<ByteBuffer> bufferPool, final AsynchronousSocketChannel ch) {
return create(bufferPool, ch, null, 0, 0);
}

View File

@@ -6,11 +6,8 @@
package org.redkale.net;
import java.net.*;
import java.nio.*;
import java.nio.charset.*;
import java.util.Collection;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
import javax.net.ssl.SSLContext;
import org.redkale.convert.bson.*;
@@ -39,12 +36,6 @@ public class Context {
//ByteBuffer的容量默认8K
protected final int bufferCapacity;
//ByteBuffer对象池
protected final ObjectPool<ByteBuffer> bufferPool;
//Response对象池
protected final ObjectPool<Response> responsePool;
//服务的根Servlet
protected final PrepareServlet prepare;
@@ -83,22 +74,18 @@ public class Context {
public Context(ContextConfig config) {
this(config.serverStartTime, config.logger, config.executor, config.sslContext,
config.bufferCapacity, config.bufferPool, config.responsePool, config.maxconns, config.maxbody,
config.charset, config.address, config.resourceFactory, config.prepare,
config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
config.bufferCapacity, config.maxconns, config.maxbody, config.charset, config.address, config.resourceFactory,
config.prepare, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
}
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool, final int maxconns,
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
final PrepareServlet prepare, final int aliveTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
int bufferCapacity, final int maxconns, final int maxbody, Charset charset, InetSocketAddress address,
ResourceFactory resourceFactory, PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
this.serverStartTime = serverStartTime;
this.logger = logger;
this.executor = executor;
this.sslContext = sslContext;
this.bufferCapacity = bufferCapacity;
this.bufferPool = bufferPool;
this.responsePool = responsePool;
this.maxconns = maxconns;
this.maxbody = maxbody;
this.charset = StandardCharsets.UTF_8.equals(charset) ? null : charset;
@@ -160,36 +147,6 @@ public class Context {
return bufferCapacity;
}
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public Consumer<ByteBuffer> getBufferConsumer() {
return bufferPool;
}
public ByteBuffer pollBuffer() {
return bufferPool.get();
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public void offerBuffer(Collection<ByteBuffer> buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
public Logger getLogger() {
return logger;
}
@@ -228,12 +185,6 @@ public class Context {
//ByteBuffer的容量默认8K
public int bufferCapacity;
//ByteBuffer对象池
public ObjectPool<ByteBuffer> bufferPool;
//Response对象池
public ObjectPool<Response> responsePool;
//服务的根Servlet
public PrepareServlet prepare;

View File

@@ -8,7 +8,6 @@ package org.redkale.net;
import java.io.IOException;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.*;
import org.redkale.util.*;
@@ -28,12 +27,15 @@ public class PrepareRunner implements Runnable {
private final Context context;
private final ObjectPool<Response> responsePool;
private ByteBuffer data;
private Response response;
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) {
public PrepareRunner(Context context, ObjectPool<Response> responsePool, AsyncConnection channel, ByteBuffer data, Response response) {
this.context = context;
this.responsePool = responsePool;
this.channel = channel;
this.data = data;
this.response = response;
@@ -41,8 +43,6 @@ public class PrepareRunner implements Runnable {
@Override
public void run() {
final boolean keepalive = response != null;
final ObjectPool<? extends Response> responsePool = context.responsePool;
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
if (response == null) response = responsePool.get();
try {
@@ -56,8 +56,7 @@ public class PrepareRunner implements Runnable {
}
if (response == null) response = responsePool.get();
try {
channel.read(keepalive ? context.getAliveTimeoutSeconds() : context.getReadTimeoutSeconds(), TimeUnit.SECONDS,
new CompletionHandler<Integer, ByteBuffer>() {
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer count, ByteBuffer buffer) {
if (count < 1) {
@@ -165,7 +164,7 @@ public class PrepareRunner implements Runnable {
}
protected Response pollResponse() {
return context.responsePool.get();
return responsePool.get();
}
protected Request pollRequest(Response response) {

View File

@@ -43,7 +43,7 @@ public abstract class ProtocolServer {
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;
public abstract void accept() throws IOException;
public abstract void accept(Server server) throws IOException;
public abstract void close() throws IOException;

View File

@@ -9,6 +9,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.ObjectPool;
/**
* 协议请求对象
@@ -23,6 +24,8 @@ public abstract class Request<C extends Context> {
protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final BsonConvert bsonConvert;
protected final JsonConvert jsonConvert;
@@ -47,9 +50,9 @@ public abstract class Request<C extends Context> {
protected final Map<String, Object> attributes = new HashMap<>();
protected Request(C context) {
protected Request(C context, ObjectPool<ByteBuffer> bufferPool) {
this.context = context;
this.readBuffer = context.pollBuffer();
this.bufferPool = bufferPool;
this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert();
}
@@ -67,7 +70,7 @@ public abstract class Request<C extends Context> {
protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
@@ -77,7 +80,7 @@ public abstract class Request<C extends Context> {
buffer.clear();
this.readBuffer = buffer;
} else {
context.offerBuffer(buffer);
bufferPool.accept(buffer);
}
}

View File

@@ -10,6 +10,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.util.ObjectPool;
/**
* 协议响应对象
@@ -26,6 +27,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context;
protected final ObjectPool<ByteBuffer> bufferPool;
protected final ObjectPool<Response> responsePool;
protected final R request;
protected AsyncConnection channel;
@@ -66,15 +71,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment;
}
} else if (writeBodyBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) {
if (bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment;
}
} else {
context.offerBuffer(attachment);
bufferPool.accept(attachment);
}
}
@@ -108,31 +113,33 @@ public abstract class Response<C extends Context, R extends Request<C>> {
private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0;
if (writeHeadBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start];
start++;
}
}
if (writeBodyBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) {
if (bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start];
start++;
}
}
for (int i = start; i < attachments.length; i++) {
context.offerBuffer(attachments[i]);
bufferPool.accept(attachments[i]);
}
}
};
protected Response(C context, final R request) {
protected Response(C context, final R request, ObjectPool<Response> responsePool) {
this.context = context;
this.request = request;
this.writeHeadBuffer = context.pollBuffer();
this.writeBodyBuffer = context.pollBuffer();
this.bufferPool = request.bufferPool;
this.responsePool = responsePool;
this.writeHeadBuffer = bufferPool.get();
this.writeBodyBuffer = bufferPool.get();
this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return context.pollBuffer();
if (buffer == null) return bufferPool.get();
writeBodyBuffer = null;
return buffer;
};
@@ -141,14 +148,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
if (buffer == null) buffer = bufferPool.get();
return buffer;
}
@@ -157,7 +164,9 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
protected void offerBuffer(ByteBuffer... buffers) {
context.offerBuffer(buffers);
for (ByteBuffer buffer : buffers) {
bufferPool.accept(buffer);
}
}
protected AsyncConnection removeChannel() {
@@ -257,19 +266,19 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
this.recycle();
this.prepare();
new PrepareRunner(context, conn, null, this).run();
new PrepareRunner(context, this.responsePool, conn, null, this).run();
} else {
channel.dispose();
}
} else {
this.context.responsePool.accept(this);
this.responsePool.accept(this);
}
}
public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = this.context.pollBuffer();
ByteBuffer buffer = this.bufferPool.get();
buffer.put(bs);
buffer.flip();
this.finish(buffer);
@@ -285,7 +294,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run();
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(boolean kill, ByteBuffer buffer) {
@@ -296,7 +305,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
conn.write(buffer, buffer, finishHandler);
if (more) new PrepareRunner(this.context, conn, data, null).run();
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(ByteBuffer... buffers) {
@@ -306,7 +315,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run();
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
public void finish(boolean kill, ByteBuffer... buffers) {
@@ -317,7 +326,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
final boolean more = data != null && this.request.keepAlive;
this.request.more = more;
conn.write(buffers, buffers, finishHandler2);
if (more) new PrepareRunner(this.context, conn, data, null).run();
if (more) new PrepareRunner(this.context, this.responsePool, conn, data, null).run();
}
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
@@ -328,14 +337,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this);
} else {
context.offerBuffer(buffer);
bufferPool.accept(buffer);
if (handler != null) handler.completed(result, attachment);
}
}
@Override
public void failed(Throwable exc, A attachment) {
context.offerBuffer(buffer);
bufferPool.accept(buffer);
if (handler != null) handler.failed(exc, attachment);
}
@@ -353,7 +362,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
index = i;
break;
}
context.offerBuffer(buffers[i]);
bufferPool.accept(buffers[i]);
}
if (index == 0) {
channel.write(buffers, attachment, this);
@@ -367,7 +376,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) {
context.offerBuffer(buffer);
bufferPool.accept(buffer);
}
if (handler != null) handler.failed(exc, attachment);
}

View File

@@ -7,14 +7,14 @@ package org.redkale.net;
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
import javax.net.ssl.SSLContext;
import org.redkale.net.Filter;
import org.redkale.util.*;
/**
@@ -127,8 +127,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024);
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), 32 * 1024);
this.bufferCapacity = bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity;
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), "UDP".equalsIgnoreCase(protocol) ? 1350 : 32 * 1024);
this.bufferCapacity = "UDP".equalsIgnoreCase(protocol) ? bufCapacity : (bufCapacity < 8 * 1024 ? 8 * 1024 : bufCapacity);
this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 32);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", this.threads * 4);
this.responsePoolSize = config.getIntValue("responsePoolSize", this.threads * 2);
@@ -281,7 +281,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
this.serverChannel.open(config);
serverChannel.bind(address, backlog);
serverChannel.accept();
serverChannel.accept(this);
final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", maxbody: " + formatLenth(context.maxbody) + ", bufferCapacity: " + formatLenth(bufferCapacity) + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
@@ -299,7 +299,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
newServerChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
newServerChannel.open(config);
newServerChannel.bind(addr, backlog);
newServerChannel.accept();
newServerChannel.accept(this);
} catch (IOException e) {
context.address = oldAddress;
throw e;
@@ -358,6 +358,15 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
protected abstract C createContext();
//必须在 createContext()之后调用
protected abstract ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize);
//必须在 createContext()之后调用
protected abstract ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize);
//必须在 createResponsePool()之后调用
protected abstract Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool);
public void shutdown() throws IOException {
long s = System.currentTimeMillis();
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning");

View File

@@ -103,13 +103,6 @@ public class TcpAioAsyncConnection extends AsyncConnection {
}
}
@Override
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
this.readtime = System.currentTimeMillis();
ByteBuffer dst = pollReadBuffer();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, dst, handler);
}
private <A> void nextWrite(Throwable exc, A attachment) {
BlockingQueue<WriteEntry> queue = this.writeQueue;
if (queue != null && exc != null && !isOpen()) {

View File

@@ -7,10 +7,12 @@ package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.redkale.util.AnyValue;
import org.redkale.util.*;
/**
* 协议底层Server
@@ -70,7 +72,14 @@ public class TcpAioProtocolServer extends ProtocolServer {
}
@Override
public void accept() throws IOException {
public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@@ -93,9 +102,9 @@ public class TcpAioProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
AsyncConnection conn = new TcpAioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), channel,
AsyncConnection conn = new TcpAioAsyncConnection(bufferPool, bufferPool, channel,
context.getSSLContext(), null, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
context.runAsync(new PrepareRunner(context, conn, null, null));
context.runAsync(new PrepareRunner(context, responsePool, conn, null, null));
} catch (Throwable e) {
context.logger.log(Level.INFO, channel + " accept error", e);
}

View File

@@ -10,7 +10,6 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import javax.net.ssl.SSLContext;
@@ -142,11 +141,6 @@ public class UdpBioAsyncConnection extends AsyncConnection {
}
}
@Override
public void read(long timeout, TimeUnit unit, CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}
@Override
public int read(ByteBuffer dst) throws IOException {
int rs = channel.read(dst);

View File

@@ -11,7 +11,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.redkale.util.AnyValue;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.util.*;
/**
* 协议底层Server
@@ -70,7 +71,14 @@ public class UdpBioProtocolServer extends ProtocolServer {
}
@Override
public void accept() throws IOException {
public void accept(Server server) throws IOException {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
ObjectPool<ByteBuffer> bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool));
final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
@@ -81,15 +89,15 @@ public class UdpBioProtocolServer extends ProtocolServer {
public void run() {
cdl.countDown();
while (running) {
final ByteBuffer buffer = context.pollBuffer();
final ByteBuffer buffer = bufferPool.get();
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(context.getBufferSupplier(), context.getBufferConsumer(), serchannel,
AsyncConnection conn = new UdpBioAsyncConnection(bufferPool, bufferPool, serchannel,
context.getSSLContext(), address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, buffer, null));
context.runAsync(new PrepareRunner(context, responsePool, conn, buffer, null));
} catch (Exception e) {
context.offerBuffer(buffer);
bufferPool.accept(buffer);
}
}
}

View File

@@ -28,6 +28,8 @@ public class HttpContext extends Context {
protected final ConcurrentHashMap<Class, Creator> asyncHandlerCreators = new ConcurrentHashMap<>();
protected String remoteAddrHeader;
public HttpContext(HttpContextConfig config) {
super(config);
random.setSeed(Math.abs(System.nanoTime()));
@@ -43,10 +45,6 @@ public class HttpContext extends Context {
return executor;
}
protected ObjectPool<Response> getResponsePool() {
return responsePool;
}
@SuppressWarnings("unchecked")
protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass);
@@ -162,5 +160,6 @@ public class HttpContext extends Context {
public static class HttpContextConfig extends ContextConfig {
public String remoteAddrHeader;
}
}

View File

@@ -6,6 +6,8 @@
package org.redkale.net.http;
import java.io.*;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
@@ -69,15 +71,17 @@ public class HttpRequest extends Request<HttpContext> {
protected int actionid;
protected Annotation[] annotations;
protected Object currentUser;
private final String remoteAddrHeader;
Object attachment; //仅供HttpServlet传递Entry使用
public HttpRequest(HttpContext context, String remoteAddrHeader) {
super(context);
this.remoteAddrHeader = remoteAddrHeader;
public HttpRequest(HttpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context, bufferPool);
this.remoteAddrHeader = context.remoteAddrHeader;
}
protected boolean isWebSocket() {
@@ -298,6 +302,55 @@ public class HttpRequest extends Request<HttpContext> {
return this.actionid;
}
/**
* 获取当前操作Method上的注解集合
*
* @return Annotation[]
*/
public Annotation[] getAnnotations() {
if (this.annotations == null) return new Annotation[0];
Annotation[] newanns = new Annotation[this.annotations.length];
System.arraycopy(this.annotations, 0, newanns, 0, newanns.length);
return newanns;
}
/**
* 获取当前操作Method上的注解
*
* @param <T> 注解泛型
* @param annotationClass 注解类型
*
* @return Annotation
*/
public <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
if (this.annotations == null) return null;
for (Annotation ann : this.annotations) {
if (ann.getClass() == annotationClass) return (T) ann;
}
return null;
}
/**
* 获取当前操作Method上的注解集合
*
* @param <T> 注解泛型
* @param annotationClass 注解类型
*
* @return Annotation[]
*/
public <T extends Annotation> T[] getAnnotationsByType(Class<T> annotationClass) {
if (this.annotations == null) return (T[]) Array.newInstance(annotationClass, 0);
T[] news = (T[]) Array.newInstance(annotationClass, this.annotations.length);
int index = 0;
for (Annotation ann : this.annotations) {
if (ann.getClass() == annotationClass) {
news[index++] = (T) ann;
}
}
if (index < 1) return (T[]) Array.newInstance(annotationClass, 0);
return Arrays.copyOf(news, index);
}
/**
* 获取客户端地址IP
*
@@ -443,6 +496,7 @@ public class HttpRequest extends Request<HttpContext> {
this.bodyparsed = false;
this.moduleid = 0;
this.actionid = 0;
this.annotations = null;
this.currentUser = null;
this.attachment = null;

View File

@@ -148,8 +148,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
}
public HttpResponse(HttpContext context, HttpRequest request, HttpResponseConfig config) {
super(context, request);
public HttpResponse(HttpContext context, HttpRequest request, ObjectPool<Response> responsePool, HttpResponseConfig config) {
super(context, request, responsePool);
this.plainContentType = config.plainContentType == null || config.plainContentType.isEmpty() ? "text/plain; charset=utf-8" : config.plainContentType;
this.jsonContentType = config.jsonContentType == null || config.jsonContentType.isEmpty() ? "application/json; charset=utf-8" : config.jsonContentType;
this.plainContentTypeBytes = ("Content-Type: " + this.plainContentType + "\r\n").getBytes();
@@ -174,6 +174,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return channel;
}
@Override
protected void prepare() {
super.prepare();
}
@Override
protected boolean recycle() {
this.status = 200;
@@ -1197,7 +1202,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
context.offerBuffer(attachment);
bufferPool.accept(attachment);
finish(true);
try {
filechannel.close();

View File

@@ -37,6 +37,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private byte[] currDateBytes;
private HttpResponseConfig respConfig;
public HttpServer() {
this(System.currentTimeMillis(), ResourceFactory.root());
}
@@ -304,16 +306,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@SuppressWarnings("unchecked")
protected HttpContext createContext() {
final int port = this.address.getPort();
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
this.bufferCapacity = Math.max(this.bufferCapacity, 16 * 1024 + 16); //兼容 HTTP 2.0;
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
final List<String[]> defaultAddHeaders = new ArrayList<>();
final List<String[]> defaultSetHeaders = new ArrayList<>();
boolean autoOptions = false;
@@ -423,7 +416,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final String addrHeader = remoteAddrHeader;
final HttpResponseConfig respConfig = new HttpResponseConfig();
this.respConfig = new HttpResponseConfig();
respConfig.plainContentType = plainContentType;
respConfig.jsonContentType = jsonContentType;
respConfig.defaultAddHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]);
@@ -433,18 +426,12 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
respConfig.dateSupplier = dateSupplier;
respConfig.renders = ((HttpPrepareServlet) prepare).renders;
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
final HttpContextConfig contextConfig = new HttpContextConfig();
contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger;
contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset;
@@ -454,9 +441,32 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
contextConfig.aliveTimeoutSeconds = this.aliveTimeoutSeconds;
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
contextConfig.remoteAddrHeader = addrHeader;
HttpContext httpcontext = new HttpContext(contextConfig);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), respConfig));
return httpcontext;
return new HttpContext(contextConfig);
}
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return HttpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context, bufferPool), responsePool, this.respConfig);
}
}

View File

@@ -6,6 +6,7 @@
package org.redkale.net.http;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.*;
@@ -71,6 +72,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
request.attachment = entry;
request.moduleid = entry.moduleid;
request.actionid = entry.actionid;
request.annotations = entry.annotations;
if (entry.auth) {
response.thenEvent(authSuccessServlet);
authenticate(request, response);
@@ -210,6 +212,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
InnerActionEntry(int moduleid, int actionid, String name, String[] methods, Method method, HttpServlet servlet) {
this(moduleid, actionid, name, methods, method, auth(method), cacheseconds(method), servlet);
this.annotations = annotations(method);
}
//供Rest类使用参数不能随便更改
@@ -232,16 +235,21 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
} : null;
}
private static boolean auth(Method method) {
protected static boolean auth(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null || mapping.auth();
}
private static int cacheseconds(Method method) {
protected static int cacheseconds(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null ? 0 : mapping.cacheseconds();
}
//Rest.class会用到此方法
protected static Annotation[] annotations(Method method) {
return method.getAnnotations();
}
boolean isNeedCheck() {
return this.moduleid != 0 || this.actionid != 0;
}
@@ -270,9 +278,11 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
final String[] methods;
final Method method;
final HttpServlet servlet;
Method method;
Annotation[] annotations;
}
private HttpServlet createActionServlet(final Method method) {

View File

@@ -355,6 +355,10 @@ public final class Rest {
fv.visitEnd();
}
}
{ //_redkale_annotations
fv = cw.visitField(ACC_PUBLIC + ACC_STATIC, "_redkale_annotations", "Ljava/util/Map;", "Ljava/util/Map<Ljava/lang/String;[Ljava/lang/annotation/Annotation;>;", null);
fv.visitEnd();
}
{ //_DynWebSocketServlet构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
mv.visitVarInsn(ALOAD, 0);
@@ -422,7 +426,7 @@ public final class Rest {
}
RestClassLoader newLoader = new RestClassLoader(loader);
Map<String, Annotation[]> msgclassToAnnotations = new HashMap<>();
for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List
Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
@@ -504,6 +508,30 @@ public final class Rest {
mv.visitMaxs(2, 2);
mv.visitEnd();
}
{ //getAnnotations
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getAnnotations", "()[Ljava/lang/annotation/Annotation;", null, null));
mv.visitFieldInsn(GETSTATIC, newDynName, "_redkale_annotations", "Ljava/util/Map;");
mv.visitLdcInsn(newDynMessageFullName + endfix);
mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get", "(Ljava/lang/Object;)Ljava/lang/Object;", true);
mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;");
mv.visitVarInsn(ASTORE, 1);
mv.visitVarInsn(ALOAD, 1);
Label l2 = new Label();
mv.visitJumpInsn(IFNONNULL, l2);
mv.visitInsn(ICONST_0);
mv.visitTypeInsn(ANEWARRAY, "java/lang/annotation/Annotation");
mv.visitInsn(ARETURN);
mv.visitLabel(l2);
mv.visitFrame(Opcodes.F_APPEND, 1, new Object[]{"[Ljava/lang/annotation/Annotation;"}, 0, null);
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 1);
mv.visitInsn(ARRAYLENGTH);
mv.visitMethodInsn(INVOKESTATIC, "java/util/Arrays", "copyOf", "([Ljava/lang/Object;I)[Ljava/lang/Object;", false);
mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;");
mv.visitInsn(ARETURN);
mv.visitMaxs(2, 2);
mv.visitEnd();
}
{ //execute
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(L" + newDynWebSokcetFullName + ";)V", null, null));
mv.visitVarInsn(ALOAD, 0);
@@ -544,6 +572,7 @@ public final class Rest {
}
cw2.visitEnd();
newLoader.loadClass((newDynMessageFullName + endfix).replace('/', '.'), cw2.toByteArray());
msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations());
}
{ //_DynXXXWebSocketMessage class
@@ -676,6 +705,7 @@ public final class Rest {
Class<?> newClazz = newLoader.loadClass(newDynName.replace('/', '.'), cw.toByteArray());
try {
T servlet = (T) newClazz.getDeclaredConstructor().newInstance();
newClazz.getField("_redkale_annotations").set(null, msgclassToAnnotations);
if (rws.cryptor() != Cryptor.class) {
Cryptor cryptor = rws.cryptor().getDeclaredConstructor().newInstance();
Field cryptorField = newClazz.getSuperclass().getDeclaredField("cryptor"); //WebSocketServlet
@@ -1810,9 +1840,10 @@ public final class Rest {
// HashMap<String, InnerActionEntry> _createRestInnerActionEntry() {
// HashMap<String, InnerActionEntry> map = new HashMap<>();
// map.put("asyncfind3", new InnerActionEntry(100000,200000,"asyncfind3", new String[]{},null,false,0, new _Dync_asyncfind3_HttpServlet()));
// map.put("asyncfind3", new InnerActionEntry(1,2,"asyncfind2", new String[]{"GET", "POST"},null,true,0, new _Dync_asyncfind2_HttpServlet()));
// map.put("asyncfind2", new InnerActionEntry(1,2,"asyncfind2", new String[]{"GET", "POST"},null,true,0, new _Dync_asyncfind2_HttpServlet()));
// return map;
// }
Map<String, Method> mappingurlToMethod = new HashMap<>();
{ //_createRestInnerActionEntry 方法
mv = new MethodDebugVisitor(cw.visitMethod(0, "_createRestInnerActionEntry", "()Ljava/util/HashMap;", "()Ljava/util/HashMap<Ljava/lang/String;L" + innerEntryName + ";>;", null));
//mv.setDebug(true);
@@ -1822,6 +1853,7 @@ public final class Rest {
mv.visitVarInsn(ASTORE, 1);
for (final MappingEntry entry : entrys) {
mappingurlToMethod.put(entry.mappingurl, entry.mappingMethod);
mv.visitVarInsn(ALOAD, 1);
mv.visitLdcInsn(entry.mappingurl); //name
mv.visitTypeInsn(NEW, innerEntryName); //new InnerActionEntry
@@ -1837,9 +1869,9 @@ public final class Rest {
mv.visitLdcInsn(entry.methods[i]);
mv.visitInsn(AASTORE);
}
mv.visitInsn(ACONST_NULL); //method
mv.visitInsn(ACONST_NULL); //method
mv.visitInsn(entry.auth ? ICONST_1 : ICONST_0); //auth
pushInt(mv, entry.cacheseconds); //cacheseconds
pushInt(mv, entry.cacheseconds); //cacheseconds
mv.visitTypeInsn(NEW, newDynName + "$" + entry.newActionClassName);
mv.visitInsn(DUP);
mv.visitVarInsn(ALOAD, 0);
@@ -1920,7 +1952,12 @@ public final class Rest {
restactMethod.setAccessible(true);
Field tmpentrysfield = HttpServlet.class.getDeclaredField("_tmpentrys");
tmpentrysfield.setAccessible(true);
tmpentrysfield.set(obj, restactMethod.invoke(obj));
HashMap<String, HttpServlet.InnerActionEntry> innerEntryMap = (HashMap) restactMethod.invoke(obj);
for (Map.Entry<String, HttpServlet.InnerActionEntry> en : innerEntryMap.entrySet()) {
Method m = mappingurlToMethod.get(en.getKey());
if (m != null) en.getValue().annotations = HttpServlet.InnerActionEntry.annotations(m);
}
tmpentrysfield.set(obj, innerEntryMap);
return obj;
} catch (Throwable e) {
throw new RuntimeException(e);

View File

@@ -11,10 +11,11 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection;
import org.redkale.util.Comment;
/**
@@ -82,6 +83,8 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketEngine _engine; //不可能为空
AsyncConnection _channel;//不可能为空
String _sessionid; //不可能为空
G _userid; //不可能为空
@@ -674,12 +677,21 @@ public abstract class WebSocket<G extends Serializable, T> {
}
/**
* 获取ByteBuffer资源池
* 获取ByteBuffer生成器
*
* @return Supplier
*/
protected Supplier<ByteBuffer> getByteBufferSupplier() {
return this._runner.context.getBufferSupplier();
protected Supplier<ByteBuffer> getBufferSupplier() {
return this._channel.getBufferSupplier();
}
/**
* 获取ByteBuffer回收器
*
* @return Consumer
*/
protected Consumer<ByteBuffer> getBufferConsumer() {
return this._channel.getBufferConsumer();
}
//-------------------------------------------------------------------

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import static org.redkale.net.http.WebSocketServlet.DEFAILT_LIVEINTERVAL;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -229,26 +230,45 @@ public class WebSocketEngine {
}
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
//packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}
}
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else {
CompletableFuture<Integer> future = null;
@@ -286,16 +306,23 @@ public class WebSocketEngine {
}
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
if (more) {
Supplier<ByteBuffer> bufferSupplier = null;
Consumer<ByteBuffer> bufferConsumer = null;
//此处的WebSocketPacket只能是包含payload或bytes内容的不能包含sendConvert、sendJson、sendBuffers
final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
: ((message == null || message instanceof CharSequence || message instanceof byte[])
? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
//packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor));
CompletableFuture<Integer> future = null;
if (single) {
for (Serializable userid : userids) {
WebSocket websocket = websockets.get(userid);
if (websocket == null) continue;
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
} else {
@@ -303,11 +330,23 @@ public class WebSocketEngine {
List<WebSocket> list = websockets2.get(userid);
if (list == null) continue;
for (WebSocket websocket : list) {
if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor));
}
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}
}
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
final Consumer<ByteBuffer> bufferConsumer0 = bufferConsumer;
if (future != null) future.whenComplete((rs, ex) -> {
if (packet.sendBuffers != null && bufferConsumer0 != null) {
for (ByteBuffer buffer : packet.sendBuffers) {
bufferConsumer0.accept(buffer);
}
}
});
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else {
CompletableFuture<Integer> future = null;

View File

@@ -428,7 +428,7 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]);
}
tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys);
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
@@ -692,7 +692,7 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]);
}
tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys);
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) {

View File

@@ -492,7 +492,7 @@ public final class WebSocketPacket {
void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) {
if (webSocket._engine.cryptor != null) {
HttpContext context = webSocket._engine.context;
buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer());
buffers = webSocket._engine.cryptor.decrypt(buffers, webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer());
}
FrameType selfType = this.type;
final boolean series = selfType == FrameType.SERIES;

View File

@@ -5,6 +5,10 @@
*/
package org.redkale.net.http;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.util.Arrays;
/**
*
* 供WebSocket.preOnMessage 方法获取RestWebSocket里OnMessage方法的参数 <br>
@@ -16,6 +20,29 @@ package org.redkale.net.http;
public interface WebSocketParam {
public <T> T getValue(String name);
public String[] getNames();
public Annotation[] getAnnotations();
default <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
for (Annotation ann : getAnnotations()) {
if (ann.getClass() == annotationClass) return (T) ann;
}
return null;
}
default <T extends Annotation> T[] getAnnotationsByType(Class<T> annotationClass) {
Annotation[] annotations = getAnnotations();
if (annotations == null) return (T[]) Array.newInstance(annotationClass, 0);
T[] news = (T[]) Array.newInstance(annotationClass, annotations.length);
int index = 0;
for (Annotation ann : annotations) {
if (ann.getClass() == annotationClass) {
news[index++] = (T) ann;
}
}
if (index < 1) return (T[]) Array.newInstance(annotationClass, 0);
return Arrays.copyOf(news, index);
}
}

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.net.http;
import org.redkale.net.AsyncConnection;
import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType;
import java.nio.ByteBuffer;
@@ -29,8 +28,6 @@ class WebSocketRunner implements Runnable {
private final WebSocketEngine engine;
private final AsyncConnection channel;
private final WebSocket webSocket;
protected final HttpContext context;
@@ -49,13 +46,12 @@ class WebSocketRunner implements Runnable {
protected long lastReadTime;
WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel) {
WebSocketRunner(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context;
this.engine = webSocket._engine;
this.webSocket = webSocket;
this.mergemsg = webSocket._engine.mergemsg;
this.restMessageConsumer = messageConsumer;
this.channel = channel;
}
@Override
@@ -64,10 +60,10 @@ class WebSocketRunner implements Runnable {
final WebSocketRunner self = this;
try {
webSocket.onConnected();
channel.setReadTimeoutSeconds(300); //读取超时5分钟
if (channel.isOpen()) {
webSocket._channel.setReadTimeoutSeconds(300); //读取超时5分钟
if (webSocket._channel.isOpen()) {
final int wsmaxbody = webSocket._engine.wsmaxbody;
channel.read(new CompletionHandler<Integer, ByteBuffer>() {
webSocket._channel.read(new CompletionHandler<Integer, ByteBuffer>() {
//尚未解析完的数据包
private WebSocketPacket unfinishPacket;
@@ -94,11 +90,11 @@ class WebSocketRunner implements Runnable {
onePacket = unfinishPacket;
unfinishPacket = null;
for (ByteBuffer b : exBuffers) {
context.offerBuffer(b);
webSocket._channel.offerBuffer(b);
}
exBuffers.clear();
} else { //需要继续接收, 此处不能回收readBuffer
channel.read(this);
webSocket._channel.read(this);
return;
}
}
@@ -125,7 +121,7 @@ class WebSocketRunner implements Runnable {
}
//继续监听消息
if (readBuffer.hasRemaining()) { //exBuffers缓存了
readBuffer = context.pollBuffer();
readBuffer = webSocket._channel.pollReadBuffer();
} else {
readBuffer.clear();
}
@@ -133,8 +129,8 @@ class WebSocketRunner implements Runnable {
readBuffer.put(halfBytes.getValue());
halfBytes.setValue(null);
}
channel.setReadBuffer(readBuffer);
channel.read(this);
webSocket._channel.setReadBuffer(readBuffer);
webSocket._channel.read(this);
//消息处理
for (final WebSocketPacket packet : packets) {
@@ -229,11 +225,11 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier(), this.context.getBufferConsumer(), webSocket._engine.cryptor);
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
this.lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
webSocket._channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
private CompletableFuture<Integer> future = futureResult;
@@ -245,7 +241,7 @@ class WebSocketRunner implements Runnable {
future = null;
if (attachments != null) {
for (ByteBuffer buf : attachments) {
context.offerBuffer(buf);
webSocket._channel.offerBuffer(buf);
}
}
}
@@ -260,7 +256,7 @@ class WebSocketRunner implements Runnable {
}
}
if (index >= 0) { //ByteBuffer[]统一回收的可以采用此写法
channel.write(attachments, index, attachments.length - index, attachments, this);
webSocket._channel.write(attachments, index, attachments.length - index, attachments, this);
return;
}
if (future != null) {
@@ -268,7 +264,7 @@ class WebSocketRunner implements Runnable {
future = null;
if (attachments != null) {
for (ByteBuffer buf : attachments) {
context.offerBuffer(buf);
webSocket._channel.offerBuffer(buf);
}
}
}
@@ -310,7 +306,7 @@ class WebSocketRunner implements Runnable {
if (closed) return null;
closed = true;
CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket);
channel.dispose();
webSocket._channel.dispose();
webSocket.onClose(code, reason);
return future;
}

View File

@@ -202,6 +202,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine;
webSocket._channel = response.getChannel();
webSocket._messageTextType = this.messageTextType;
webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert;
@@ -262,7 +263,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Consumer<Boolean> task = (oldkilled) -> {
if (oldkilled) {
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
@@ -283,7 +285,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
} else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
@@ -291,14 +294,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
});
} else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
response.removeChannel();
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
@@ -323,7 +327,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
});
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer);
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;

View File

@@ -112,7 +112,7 @@ public final class SncpDynServlet extends SncpServlet {
@SuppressWarnings("unchecked")
public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) {
bufferSupplier = request.getContext().getBufferSupplier();
bufferSupplier = request.getBufferPool();
}
final SncpServletAction action = actions.get(request.getActionid());
//logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));

View File

@@ -45,11 +45,15 @@ public final class SncpRequest extends Request<SncpContext> {
private byte[] bufferbytes = new byte[6];
protected SncpRequest(SncpContext context) {
super(context);
protected SncpRequest(SncpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context, bufferPool);
this.convert = context.getBsonConvert();
}
protected ObjectPool<ByteBuffer> getBufferPool() {
return this.bufferPool;
}
@Override
protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < HEADER_SIZE) {

View File

@@ -45,8 +45,8 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
return null;
}
protected SncpResponse(SncpContext context, SncpRequest request) {
super(context, request);
protected SncpResponse(SncpContext context, SncpRequest request, ObjectPool<Response> responsePool) {
super(context, request, responsePool);
this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");
@@ -56,7 +56,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
protected void offerBuffer(ByteBuffer... buffers) {
super.offerBuffer(buffers);
}
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteBuffer buffer = pollWriteReadBuffer();

View File

@@ -99,28 +99,14 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
@Override
@SuppressWarnings("unchecked")
protected SncpContext createContext() {
final int port = this.address.getPort();
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = Math.max(this.bufferCapacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
this.bufferCapacity = Math.max(this.bufferCapacity, 8 * 1024);
final SncpContextConfig contextConfig = new SncpContextConfig();
contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger;
contextConfig.executor = this.executor;
contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = rcapacity;
contextConfig.bufferPool = bufferPool;
contextConfig.responsePool = responsePool;
contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.maxconns = this.maxconns;
contextConfig.maxbody = this.maxbody;
contextConfig.charset = this.charset;
@@ -131,9 +117,31 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
contextConfig.readTimeoutSeconds = this.readTimeoutSeconds;
contextConfig.writeTimeoutSeconds = this.writeTimeoutSeconds;
SncpContext sncpcontext = new SncpContext(contextConfig);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
return sncpcontext;
return new SncpContext(contextConfig);
}
@Override
protected ObjectPool<ByteBuffer> createBufferPool(AtomicLong createCounter, AtomicLong cycleCounter, int bufferPoolSize) {
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
return bufferPool;
}
@Override
protected ObjectPool<Response> createResponsePool(AtomicLong createCounter, AtomicLong cycleCounter, int responsePoolSize) {
return SncpResponse.createPool(createCounter, cycleCounter, responsePoolSize, null);
}
@Override
protected Creator<Response> createResponseCreator(ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool) {
return (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context, bufferPool), responsePool);
}
}

View File

@@ -677,7 +677,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public <T> Map<String, Collection<T>> getCollectionMap(final Type componentType, final String... keys) {
public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys) {
Map<String, Collection<T>> map = new HashMap<>();
for (String key : keys) {
Collection<T> s = (Collection<T>) get(key);
@@ -692,7 +692,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public Map<String, Collection<String>> getStringCollectionMap(final String... keys) {
public Map<String, Collection<String>> getStringCollectionMap(final boolean set, final String... keys) {
Map<String, Collection<String>> map = new HashMap<>();
for (String key : keys) {
Collection<String> s = (Collection<String>) get(key);
@@ -707,7 +707,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public Map<String, Collection<Long>> getLongCollectionMap(final String... keys) {
public Map<String, Collection<Long>> getLongCollectionMap(final boolean set, final String... keys) {
Map<String, Collection<Long>> map = new HashMap<>();
for (String key : keys) {
Collection<Long> s = (Collection<Long>) get(key);
@@ -727,8 +727,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Map<String, Collection<V>>> getCollectionMapAsync(final Type componentType, final String... keys) {
return CompletableFuture.supplyAsync(() -> getCollectionMap(componentType, keys), getExecutor());
public CompletableFuture<Map<String, Collection<V>>> getCollectionMapAsync(final boolean set, final Type componentType, final String... keys) {
return CompletableFuture.supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor());
}
@Override
@@ -737,8 +737,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final String... keys) {
return CompletableFuture.supplyAsync(() -> getStringCollectionMap(keys), getExecutor());
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final boolean set, final String... keys) {
return CompletableFuture.supplyAsync(() -> getStringCollectionMap(set, keys), getExecutor());
}
@Override
@@ -747,8 +747,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
@Override
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final String... keys) {
return CompletableFuture.supplyAsync(() -> getLongCollectionMap(keys), getExecutor());
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final boolean set, final String... keys) {
return CompletableFuture.supplyAsync(() -> getLongCollectionMap(set, keys), getExecutor());
}
@Override

View File

@@ -92,7 +92,7 @@ public interface CacheSource<V extends Object> {
public <T> Collection<T> getCollection(final String key, final Type componentType);
public <T> Map<String, Collection<T>> getCollectionMap(final Type componentType, final String... keys);
public <T> Map<String, Collection<T>> getCollectionMap(final boolean set, final Type componentType, final String... keys);
public int getCollectionSize(final String key);
@@ -140,7 +140,7 @@ public interface CacheSource<V extends Object> {
public Collection<String> getStringCollection(final String key);
public Map<String, Collection<String>> getStringCollectionMap(final String... keys);
public Map<String, Collection<String>> getStringCollectionMap(final boolean set, final String... keys);
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds);
@@ -164,7 +164,7 @@ public interface CacheSource<V extends Object> {
public Collection<Long> getLongCollection(final String key);
public Map<String, Collection<Long>> getLongCollectionMap(final String... keys);
public Map<String, Collection<Long>> getLongCollectionMap(final boolean set, final String... keys);
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds);
@@ -241,7 +241,7 @@ public interface CacheSource<V extends Object> {
public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType);
public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(final Type componentType, final String... keys);
public <T> CompletableFuture<Map<String, Collection<T>>> getCollectionMapAsync(final boolean set, final Type componentType, final String... keys);
public CompletableFuture<Integer> getCollectionSizeAsync(final String key);
@@ -289,7 +289,7 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key);
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final String... keys);
public CompletableFuture<Map<String, Collection<String>>> getStringCollectionMapAsync(final boolean set, final String... keys);
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds);
@@ -313,7 +313,7 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Collection<Long>> getLongCollectionAsync(final String key);
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final String... keys);
public CompletableFuture<Map<String, Collection<Long>>> getLongCollectionMapAsync(final boolean set, final String... keys);
public CompletableFuture<Collection<Long>> getLongCollectionAndRefreshAsync(final String key, final int expireSeconds);

View File

@@ -177,11 +177,13 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
protected <T> int batchStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Serializable val = info.getSQLValue(attr, entity);
Object val = info.getSQLValue(attr, entity);
if (val instanceof byte[]) {
Blob blob = conn.createBlob();
blob.setBytes(1, (byte[]) val);
prestmt.setObject(++i, blob);
} else if (val instanceof Boolean) {
prestmt.setObject(++i, ((Boolean) val) ? (byte) 1 : (byte) 0);
} else if (val instanceof AtomicInteger) {
prestmt.setObject(++i, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
@@ -230,6 +232,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;
@@ -250,6 +253,7 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
stmt.close();
return CompletableFuture.completedFuture(c);
} catch (SQLException e) {
if (info.isTableNotExist(e)) return CompletableFuture.completedFuture(-1);
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);
return future;

View File

@@ -216,7 +216,7 @@ public interface DataSource {
* @param <T> Entity泛型
* @param clazz Entity类
*
* @return 影响的记录条数
* @return 影响的记录条数 -1表示表不存在
*/
public <T> int clearTable(final Class<T> clazz);
@@ -227,7 +227,7 @@ public interface DataSource {
* @param <T> Entity泛型
* @param clazz Entity类
*
* @return 影响的记录条数CompletableFuture
* @return 影响的记录条数CompletableFuture -1表示表不存在
*/
public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz);
@@ -239,7 +239,7 @@ public interface DataSource {
* @param clazz Entity类
* @param node 过滤条件
*
* @return 影响的记录条数
* @return 影响的记录条数 -1表示表不存在
*/
public <T> int clearTable(final Class<T> clazz, final FilterNode node);
@@ -251,7 +251,7 @@ public interface DataSource {
* @param clazz Entity类
* @param node 过滤条件
*
* @return 影响的记录条数CompletableFuture
* @return 影响的记录条数CompletableFuture -1表示表不存在
*/
public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz, final FilterNode node);
@@ -263,7 +263,7 @@ public interface DataSource {
* @param <T> Entity泛型
* @param clazz Entity类
*
* @return 影响的记录条数
* @return 影响的记录条数 -1表示表不存在
*/
public <T> int dropTable(final Class<T> clazz);
@@ -274,7 +274,7 @@ public interface DataSource {
* @param <T> Entity泛型
* @param clazz Entity类
*
* @return 影响的记录条数CompletableFuture
* @return 影响的记录条数CompletableFuture -1表示表不存在
*/
public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz);
@@ -286,7 +286,7 @@ public interface DataSource {
* @param clazz Entity类
* @param node 过滤条件
*
* @return 影响的记录条数
* @return 影响的记录条数 -1表示表不存在
*/
public <T> int dropTable(final Class<T> clazz, final FilterNode node);
@@ -298,7 +298,7 @@ public interface DataSource {
* @param clazz Entity类
* @param node 过滤条件
*
* @return 影响的记录条数CompletableFuture
* @return 影响的记录条数CompletableFuture -1表示表不存在
*/
public <T> CompletableFuture<Integer> dropTableAsync(final Class<T> clazz, final FilterNode node);

View File

@@ -1177,7 +1177,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
if (blobs == null) return updateDB(info, null, sql, false);
return updateDB(info, null, sql, true, blobs.toArray());
} else {
final Serializable id = info.getSQLValue(info.getPrimary(), entity);
final Serializable id = (Serializable) info.getSQLValue(info.getPrimary(), entity);
String sql = "UPDATE " + info.getTable(id) + " a SET " + setsql + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(id);
if (blobs == null) return updateDB(info, null, sql, false);
return updateDB(info, null, sql, true, blobs.toArray());

View File

@@ -902,10 +902,10 @@ public final class EntityInfo<T> {
*
* @return Object
*/
public Serializable getSQLValue(Attribute<T, Serializable> attr, T entity) {
Serializable val = attr.get(entity);
public <F> Object getSQLValue(Attribute<T, F> attr, T entity) {
Object val = attr.get(entity);
CryptHandler cryptHandler = attr.attach();
if (cryptHandler != null) val = (Serializable) cryptHandler.encrypt(val);
if (cryptHandler != null) val = cryptHandler.encrypt(val);
return val;
}

View File

@@ -86,7 +86,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Byte t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}
@@ -131,7 +131,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Short t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}
@@ -175,7 +175,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Integer t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}
@@ -229,7 +229,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Long t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}
@@ -273,7 +273,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Float t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}
@@ -317,7 +317,7 @@ public interface Range<E extends Comparable> extends java.io.Serializable, Predi
@Override
public boolean test(Double t) {
if (max < min) return t >= min;
if (max < min && max <= 0) return t >= min;
return t >= min && t <= max;
}

View File

@@ -1980,14 +1980,14 @@ public final class Utility {
}
conn.setRequestMethod(method);
if (headers != null) {
for (Map.Entry<String, String> en : headers.entrySet()) { //不用forEach是为了兼容JDK 6
for (Map.Entry<String, String> en : headers.entrySet()) {
conn.setRequestProperty(en.getKey(), en.getValue());
}
}
if (body != null && !body.isEmpty()) { //conn.getOutputStream()会将GET强制变成POST
conn.setDoInput(true);
conn.setDoOutput(true);
conn.getOutputStream().write(body == null ? new byte[0] : body.getBytes(UTF_8));
conn.getOutputStream().write(body.getBytes(UTF_8));
}
conn.connect();
int rs = conn.getResponseCode();

View File

@@ -6,7 +6,8 @@
package org.redkale.test.wsdync;
import java.io.Serializable;
import java.util.Map;
import java.lang.annotation.Annotation;
import java.util.*;
import java.util.function.BiConsumer;
import javax.annotation.Resource;
import org.redkale.convert.ConvertDisabled;
@@ -26,6 +27,8 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
@Resource
private ChatService _redkale_resource_0;
public static Map<String, Annotation[]> _redkale_annotations;
public _DyncChatWebSocketServlet() {
super();
this.messageTextType = _DyncChatWebSocketMessage.class;
@@ -82,6 +85,13 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
return null;
}
@Override
public Annotation[] getAnnotations() {
Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage_sendmessagee_00");
if (annotations == null) return new Annotation[0];
return Arrays.copyOf(annotations, annotations.length);
}
public void execute(_DyncChatWebSocket websocket) {
this._redkale_websocket = websocket;
websocket.preOnMessage("sendmessage", this, this);
@@ -116,6 +126,13 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
return null;
}
@Override
public Annotation[] getAnnotations() {
Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage_joinroom_01");
if (annotations == null) return new Annotation[0];
return Arrays.copyOf(annotations, annotations.length);
}
public void execute(_DyncChatWebSocket websocket) {
this._redkale_websocket = websocket;
websocket.preOnMessage("joinroom", this, this);