15 Commits
1.8.8 ... 1.8.9

Author SHA1 Message Date
Redkale
6413161dc8 net包优化 2017-12-29 10:19:20 +08:00
Redkale
46a89d9847 @RestHeader增加支持Host、Content-Type、Connection选项 2017-12-29 08:31:19 +08:00
Redkale
d83d6b5671 2017-12-29 08:29:27 +08:00
Redkale
b1abe91bd2 2017-12-28 21:05:20 +08:00
Redkale
bf4138178b 2017-12-28 21:04:55 +08:00
Redkale
5b99084616 2017-12-28 19:59:45 +08:00
Redkale
b95064b87e 2017-12-28 19:10:11 +08:00
Redkale
e1eece34b7 增加restart脚本 2017-12-28 11:28:47 +08:00
Redkale
19d6a06bfa 还原 2017-12-28 10:53:34 +08:00
Redkale
2fd13d1481 2017-12-28 10:40:25 +08:00
Redkale
23a0a80e3a Transport增加readTimeoutSecond、writeTimeoutSecond配置项 2017-12-28 10:24:15 +08:00
Redkale
bb493d43b5 RetResult的attach属性由Map<String, String> 改成 Map<String, Serializable> 2017-12-28 10:19:49 +08:00
Redkale
ef9eaa0a66 DataSource.updateColumn系列方法屏蔽掉limit功能 2017-12-28 10:16:21 +08:00
Redkale
1d640f943a CacheSource增加existsSetItem系列方法 2017-12-28 10:14:10 +08:00
Redkale
84f5f065ad Redkale 1.8.9 开始 2017-12-28 10:13:04 +08:00
28 changed files with 303 additions and 85 deletions

9
bin/restart.bat Normal file
View File

@@ -0,0 +1,9 @@
@ECHO OFF
SET APP_HOME=%~dp0
IF NOT EXIST "%APP_HOME%\conf\application.xml" SET APP_HOME=%~dp0..
call %APP_HOME%\bin\shutdown.bat
call %APP_HOME%\bin\start.bat

20
bin/restart.sh Normal file
View File

@@ -0,0 +1,20 @@
#!/bin/sh
export LC_ALL="zh_CN.UTF-8"
APP_HOME=`dirname "$0"`
cd "$APP_HOME"/..
APP_HOME=`pwd`
if [ ! -f "$APP_HOME"/conf/application.xml ]; then
APP_HOME="$APP_HOME"/..
fi
cd "$APP_HOME"
./bin/shutdown.sh
./bin/start.sh

View File

@@ -37,9 +37,11 @@
threads 线程总数, 默认: <group>节点数*CPU核数*8
bufferCapacity: ByteBuffer的初始化大小 默认: 8K;
bufferPoolSize ByteBuffer池的大小默认: <group>节点数*CPU核数*8
readTimeoutSecond: TCP读取超时秒数, 默认为6秒 为0表示无超时限制
writeTimeoutSecond: TCP写入超时秒数, 默认为6秒 为0表示无超时限制
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
-->
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32" readTimeoutSecond="6" writeTimeoutSecond="6"/>
<!--
一个组包含多个node 同一Service服务可以由多个进程提供这些进程称为一个GROUP且同一GROUP内的进程必须在同一机房或局域网内
@@ -64,7 +66,7 @@
<!--
全局的数据源设置, 可以是CacheSource、DataSource JDBC的DataSource通常通过persistence.xml配置此处多用于CacheSource的配置
name: 资源名,用于依赖注入。
value类名必须是CacheSource或DataSource的子类且必须实现Service接口。
value类名必须是CacheSource或DataSource的子类且必须实现Service接口。如果是DataSource.class系统自动映射成DataJdbcSource.class
groups: 指定groups。
xxx: 其他属性与子节点通过Service.init方法传入的AnyValue获取。
-->
@@ -92,7 +94,8 @@
System.setProperty("convert.json.writer.buffer.defsize", "4096");
System.setProperty("convert.bson.writer.buffer.defsize", "4096");
<properties>节点下也可包含非<property>节点,其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[]
<properties>节点下也可包含非<property>节点.
非<property>其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[]
-->
<properties load="config.properties">
<property name="system.property.yyyy" value="YYYYYY"/>
@@ -147,7 +150,7 @@
<service value="com.xxx.XXX2Service" name="" groups="xxx;yyy"/>
<!-- 给Service增加配置属性 -->
<service value="com.xxx.XXX1Service">
<!-- property节点值在 public void init(AnyValue conf) 方法中可以通过 AnyValue properties = conf.getAnyValue("properties");获取 -->
<!-- property值在public void init(AnyValue conf)方法中可以通过AnyValue properties=conf.getAnyValue("properties")获取 -->
<property name="xxxxxx" value="XXXXXXXX"/>
<property name="xxxxxx" value="XXXXXXXX"/>
</service>
@@ -172,7 +175,7 @@
<!-- 给Filter增加配置属性 -->
<filter value="com.xxx.XXX12Filter">
<!-- property节点值在 public void init(AnyValue conf) 方法中可以通过 AnyValue properties = conf.getAnyValue("properties");获取 -->
<!-- property值在public void init(AnyValue conf)方法中可以通过AnyValue properties=conf.getAnyValue("properties")获取 -->
<property name="xxxxxx" value="XXXXXXXX"/>
<property name="xxxxxx" value="XXXXXXXX"/>
</filter>

View File

@@ -7,10 +7,11 @@
module org.redkale {
requires java.se;
requires jdk.unsupported;
requires jdk.unsupported; //sun.misc.Unsafe
exports javax.annotation;
exports javax.persistence;
exports org.redkale.asm;
exports org.redkale.boot;
exports org.redkale.boot.watch;
exports org.redkale.convert;

View File

@@ -245,6 +245,8 @@ public final class Application {
TransportStrategy strategy = null;
int bufferCapacity = 8 * 1024;
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 16;
int readTimeoutSecond = TransportFactory.DEFAULT_READTIMEOUTSECOND;
int writeTimeoutSecond = TransportFactory.DEFAULT_WRITETIMEOUTSECOND;
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
if (resources != null) {
@@ -255,6 +257,8 @@ public final class Application {
//--------------transportBufferPool-----------
bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 4 * 1024);
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
readTimeoutSecond = transportConf.getIntValue("readTimeoutSecond", readTimeoutSecond);
writeTimeoutSecond = transportConf.getIntValue("writeTimeoutSecond", writeTimeoutSecond);
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
@@ -306,7 +310,7 @@ public final class Application {
return true;
});
}
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Thread.currentThread().setContextClassLoader(this.classLoader);

View File

@@ -179,6 +179,7 @@ public abstract class NodeServer {
for (AnyValue sourceConf : resources.getAnyValues("source")) {
try {
Class type = serverClassLoader.loadClass(sourceConf.getValue("value"));
if (type == DataSource.class) type = DataJdbcSource.class;
if (!Service.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application source resource, but not Service error: " + sourceConf);
} else if (CacheSource.class.isAssignableFrom(type)) {

View File

@@ -31,10 +31,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
protected volatile long writetime;
//关闭数
AtomicLong closedCounter;
protected AtomicLong closedCounter;
//在线数
AtomicLong livingCounter;
protected AtomicLong livingCounter;
public final long getLastReadTime() {
return readtime;

View File

@@ -48,14 +48,16 @@ public final class PrepareRunner implements Runnable {
}
return;
}
final ByteBuffer buffer = context.pollBuffer();
final Response response = responsePool.get();
final ByteBuffer buffer = response.request.pollReadBuffer();
try {
channel.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment1) {
if (count < 1 && buffer.remaining() == buffer.limit()) {
try {
context.offerBuffer(buffer);
response.request.offerReadBuffer(buffer);
response.finish(true);
channel.close();
} catch (Exception e) {
context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e);
@@ -69,7 +71,6 @@ public final class PrepareRunner implements Runnable {
// System.println(new String(bs));
// }
buffer.flip();
final Response response = responsePool.get();
response.init(channel);
try {
prepare.prepare(buffer, response.request, response);
@@ -81,7 +82,8 @@ public final class PrepareRunner implements Runnable {
@Override
public void failed(Throwable exc, Void attachment2) {
context.offerBuffer(buffer);
response.request.offerReadBuffer(buffer);
response.finish(true);
try {
channel.close();
} catch (Exception e) {
@@ -90,7 +92,8 @@ public final class PrepareRunner implements Runnable {
}
});
} catch (Exception te) {
context.offerBuffer(buffer);
response.request.offerReadBuffer(buffer);
response.finish(true);
try {
channel.close();
} catch (Exception e) {

View File

@@ -214,11 +214,11 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
executeCounter.incrementAndGet();
final int rs = request.readHeader(buffer);
if (rs < 0) {
response.context.offerBuffer(buffer);
request.offerReadBuffer(buffer);
if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
response.finish(true);
} else if (rs == 0) {
response.context.offerBuffer(buffer);
request.offerReadBuffer(buffer);
request.prepare();
response.filter = this.headFilter;
response.servlet = this;
@@ -236,7 +236,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
buffer.clear();
request.channel.read(buffer, buffer, this);
} else {
response.context.offerBuffer(buffer);
request.offerReadBuffer(buffer);
request.prepare();
try {
response.filter = PrepareServlet.this.headFilter;
@@ -253,7 +253,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
illRequestCounter.incrementAndGet();
response.context.offerBuffer(buffer);
request.offerReadBuffer(buffer);
response.finish(true);
if (exc != null) request.context.logger.log(Level.FINER, "Servlet read channel erroneous, forece to close channel ", exc);
}

View File

@@ -217,7 +217,7 @@ public abstract class ProtocolServer {
AsyncConnection conn = AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;
context.submitAsync(new PrepareRunner(context, conn, null));
context.runAsync(new PrepareRunner(context, conn, null));
}
@Override

View File

@@ -33,6 +33,8 @@ public abstract class Request<C extends Context> {
protected AsyncConnection channel;
protected ByteBuffer readBuffer;
/**
* properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留;
* properties 通常存放需要永久绑定在request里的一些对象
@@ -43,10 +45,28 @@ public abstract class Request<C extends Context> {
protected Request(C context) {
this.context = context;
this.readBuffer = context.pollBuffer();
this.bsonConvert = context.getBsonConvert();
this.jsonConvert = context.getJsonConvert();
}
protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
return buffer;
}
protected void offerReadBuffer(ByteBuffer buffer) {
if (buffer == null) return;
if (this.readBuffer == null) {
buffer.clear();
this.readBuffer = buffer;
} else {
context.offerBuffer(buffer);
}
}
/**
* 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。
*

View File

@@ -8,7 +8,7 @@ package org.redkale.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.function.BiConsumer;
import java.util.function.*;
import java.util.logging.Level;
/**
@@ -30,6 +30,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected AsyncConnection channel;
protected ByteBuffer writeHeadBuffer;
protected ByteBuffer writeBodyBuffer;
private boolean inited = true;
protected Object output; //输出的结果对象
@@ -40,6 +44,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Servlet<C, R, ? extends Response<C, R>> servlet;
private Supplier<ByteBuffer> bodyBufferSupplier;
private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
@@ -47,17 +53,31 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
} else {
context.offerBuffer(attachment);
offerResponseBuffer(attachment);
finish();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
context.offerBuffer(attachment);
offerResponseBuffer(attachment);
finish(true);
}
private void offerResponseBuffer(ByteBuffer attachment) {
if (writeHeadBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) {
writeHeadBuffer = attachment;
}
} else if (writeBodyBuffer == null) {
if (context.bufferPool.getRecyclerPredicate().test(attachment)) {
writeBodyBuffer = attachment;
}
} else {
context.offerBuffer(attachment);
}
}
};
private final CompletionHandler finishHandler2 = new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -74,26 +94,66 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (index >= 0) {
channel.write(attachments, index, attachments.length - index, attachments, this);
} else {
for (ByteBuffer attachment : attachments) {
context.offerBuffer(attachment);
}
offerResponseBuffer(attachments);
finish();
}
}
@Override
public void failed(Throwable exc, final ByteBuffer[] attachments) {
for (ByteBuffer attachment : attachments) {
context.offerBuffer(attachment);
}
offerResponseBuffer(attachments);
finish(true);
}
private void offerResponseBuffer(ByteBuffer[] attachments) {
int start = 0;
if (writeHeadBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeHeadBuffer = attachments[start];
start++;
}
}
if (writeBodyBuffer == null && attachments.length > start) {
if (context.bufferPool.getRecyclerPredicate().test(attachments[start])) {
writeBodyBuffer = attachments[start];
start++;
}
}
for (int i = start; i < attachments.length; i++) {
context.offerBuffer(attachments[i]);
}
}
};
protected Response(C context, final R request) {
this.context = context;
this.request = request;
this.writeHeadBuffer = context.pollBuffer();
this.writeBodyBuffer = context.pollBuffer();
this.bodyBufferSupplier = () -> {
ByteBuffer buffer = writeBodyBuffer;
if (buffer == null) return context.pollBuffer();
writeBodyBuffer = null;
return buffer;
};
}
protected ByteBuffer pollWriteReadBuffer() {
ByteBuffer buffer = this.writeHeadBuffer;
this.writeHeadBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
return buffer;
}
protected ByteBuffer pollWriteBodyBuffer() {
ByteBuffer buffer = this.writeBodyBuffer;
this.writeBodyBuffer = null;
if (buffer == null) buffer = context.pollBuffer();
return buffer;
}
protected Supplier<ByteBuffer> getBodyBufferSupplier() {
return bodyBufferSupplier;
}
protected AsyncConnection removeChannel() {

View File

@@ -29,6 +29,12 @@ import org.redkale.util.*;
*/
public class TransportFactory {
@Comment("默认TCP读取超时秒数")
public static int DEFAULT_READTIMEOUTSECOND = 6;
@Comment("默认TCP写入超时秒数")
public static int DEFAULT_WRITETIMEOUTSECOND = 6;
public static final String NAME_PINGINTERVAL = "pinginterval";
protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName());
@@ -55,6 +61,12 @@ public class TransportFactory {
//心跳周期, 单位:秒
protected int pinginterval;
//TCP读取超时秒数
protected int readTimeoutSecond;
//TCP写入超时秒数
protected int writeTimeoutSecond;
//ping的定时器
private ScheduledThreadPoolExecutor pingScheduler;
@@ -68,15 +80,18 @@ public class TransportFactory {
protected final TransportStrategy strategy;
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
this.executor = executor;
this.bufferPool = bufferPool;
this.channelGroup = channelGroup;
this.readTimeoutSecond = readTimeoutSecond;
this.writeTimeoutSecond = writeTimeoutSecond;
this.strategy = strategy;
}
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
this(executor, bufferPool, channelGroup, null);
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
int readTimeoutSecond, int writeTimeoutSecond) {
this(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null);
}
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
@@ -100,10 +115,14 @@ public class TransportFactory {
}
public static TransportFactory create(int threads) {
return create(threads, threads * 2, 8 * 1024);
return create(threads, threads * 2, 8 * 1024, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
}
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) {
return create(threads, bufferPoolSize, bufferCapacity, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
}
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSecond, int writeTimeoutSecond) {
final ObjectPool<ByteBuffer> transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
@@ -123,16 +142,21 @@ public class TransportFactory {
} catch (IOException e) {
throw new RuntimeException(e);
}
return create(transportExec, transportPool, transportGroup);
return create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
return new TransportFactory(executor, bufferPool, channelGroup, null);
return new TransportFactory(executor, bufferPool, channelGroup, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND, null);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
return new TransportFactory(executor, bufferPool, channelGroup, strategy);
int readTimeoutSecond, int writeTimeoutSecond) {
return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, strategy);
}
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {

View File

@@ -15,6 +15,7 @@ import java.text.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
@@ -34,15 +35,6 @@ import org.redkale.util.*;
*/
public class HttpResponse extends Response<HttpContext, HttpRequest> {
/**
* HttpResponse.finish 方法内调用
* 主要给@HttpCacheable使用
*/
protected static interface BufferHandler {
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
}
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
@@ -117,7 +109,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private boolean headsended = false;
private BufferHandler bufferHandler;
private BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler;
//------------------------------------------------
private final DefaultAnyValue header = new DefaultAnyValue();
@@ -262,7 +254,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJson(final Object obj) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), obj));
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), obj));
}
/**
@@ -274,7 +266,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishMapJson(final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(request.getJsonConvert().convertMapTo(context.getBufferSupplier(), objs));
finish(request.getJsonConvert().convertMapTo(getBodyBufferSupplier(), objs));
}
/**
@@ -286,7 +278,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJson(final JsonConvert convert, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(convert.convertTo(context.getBufferSupplier(), obj));
finish(convert.convertTo(getBodyBufferSupplier(), obj));
}
/**
@@ -299,7 +291,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishMapJson(final JsonConvert convert, final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(convert.convertMapTo(context.getBufferSupplier(), objs));
finish(convert.convertMapTo(getBodyBufferSupplier(), objs));
}
/**
@@ -311,7 +303,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJson(final Type type, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
this.output = obj;
finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), type, obj));
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), type, obj));
}
/**
@@ -324,7 +316,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJson(final JsonConvert convert, final Type type, final Object obj) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
finish(convert.convertTo(context.getBufferSupplier(), type, obj));
finish(convert.convertTo(getBodyBufferSupplier(), type, obj));
}
/**
@@ -335,7 +327,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJson(final Object... objs) {
this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = objs;
finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), objs));
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), objs));
}
/**
@@ -350,7 +342,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
this.header.addValue("retinfo", ret.getRetinfo());
}
finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), ret));
finish(request.getJsonConvert().convertTo(getBodyBufferSupplier(), ret));
}
/**
@@ -366,7 +358,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
this.header.addValue("retinfo", ret.getRetinfo());
}
finish(convert.convertTo(context.getBufferSupplier(), ret));
finish(convert.convertTo(getBodyBufferSupplier(), ret));
}
/**
@@ -475,8 +467,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.header.addValue("retcode", String.valueOf(ret.getRetcode())).addValue("retinfo", ret.getRetinfo());
}
}
ByteBuffer[] buffers = type == null ? convert.convertTo(context.getBufferSupplier(), obj)
: convert.convertTo(context.getBufferSupplier(), type, obj);
ByteBuffer[] buffers = type == null ? convert.convertTo(getBodyBufferSupplier(), obj)
: convert.convertTo(getBodyBufferSupplier(), type, obj);
finish(buffers);
}
}
@@ -498,7 +490,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
if (context.getCharset() == null) {
if (bufferHandler != null) {
bufferHandler.execute(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))});
bufferHandler.apply(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))});
}
final char[] chars = Utility.charArray(obj);
this.contentLength = Utility.encodeUTF8Length(chars);
@@ -513,7 +505,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else {
ByteBuffer buffer = context.getCharset().encode(obj);
if (bufferHandler != null) {
ByteBuffer[] bufs = bufferHandler.execute(this, new ByteBuffer[]{buffer});
ByteBuffer[] bufs = bufferHandler.apply(this, new ByteBuffer[]{buffer});
if (bufs != null) buffer = bufs[0];
}
this.contentLength = buffer.remaining();
@@ -558,7 +550,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
@Override
public void finish(final byte[] bs) {
if (isClosed()) return; //避免重复关闭
if (this.context.getBufferCapacity() == bs.length) {
if (this.context.getBufferCapacity() >= bs.length) {
ByteBuffer buffer = this.context.pollBuffer();
buffer.put(bs);
buffer.flip();
@@ -621,7 +613,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finish(boolean kill, ByteBuffer... buffers) {
if (isClosed()) return; //避免重复关闭
if (bufferHandler != null) {
ByteBuffer[] bufs = bufferHandler.execute(this, buffers);
ByteBuffer[] bufs = bufferHandler.apply(this, buffers);
if (bufs != null) buffers = bufs;
}
if (kill) refuseAlive();
@@ -801,9 +793,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length));
}
private ByteBuffer createHeader() {
//Header大小不能超过一个ByteBuffer的容量
protected ByteBuffer createHeader() {
this.headsended = true;
ByteBuffer buffer = this.context.pollBuffer();
ByteBuffer buffer = this.pollWriteReadBuffer();
buffer.put(("HTTP/1.1 " + this.status + " " + (this.status == 200 ? "OK" : httpCodes.get(this.status)) + "\r\n").getBytes());
buffer.put(("Content-Type: " + (this.contentType == null ? "text/plain; charset=utf-8" : this.contentType) + "\r\n").getBytes());
@@ -1015,7 +1008,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*
* @return 拦截器
*/
protected BufferHandler getBufferHandler() {
protected BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> getBufferHandler() {
return bufferHandler;
}
@@ -1024,7 +1017,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*
* @param bufferHandler 拦截器
*/
protected void setBufferHandler(BufferHandler bufferHandler) {
protected void setBufferHandler(BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler) {
this.bufferHandler = bufferHandler;
}

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import static jdk.internal.org.objectweb.asm.Opcodes.*;
@@ -315,7 +316,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
return false;
}
public final HttpResponse.BufferHandler cacheHandler;
public final BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> cacheHandler;
public final ConcurrentHashMap<String, CacheEntry> cache;

View File

@@ -956,7 +956,7 @@ public final class Rest {
av1.visitEnd();
av0.visitEnd();
}
int uploadLocal = 0;
if (mupload != null) { //存在文件上传
if (muploadType == byte[].class) {
mv.visitVarInsn(ALOAD, 1);
@@ -966,6 +966,7 @@ public final class Rest {
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFirstBytes", "(JLjava/lang/String;Ljava/lang/String;)[B", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
} else if (muploadType == File.class) {
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()Lorg/redkale/net/http/MultiContext;", false);
@@ -976,6 +977,7 @@ public final class Rest {
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFirstFile", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)Ljava/io/File;", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
} else if (muploadType == File[].class) { //File[]
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMultiContext", "()Lorg/redkale/net/http/MultiContext;", false);
@@ -986,6 +988,7 @@ public final class Rest {
mv.visitLdcInsn(mupload.contentTypeReg());
mv.visitMethodInsn(INVOKEVIRTUAL, "org/redkale/net/http/MultiContext", "partsFiles", "(Ljava/io/File;JLjava/lang/String;Ljava/lang/String;)[Ljava/io/File;", false);
mv.visitVarInsn(ASTORE, maxLocals);
uploadLocal = maxLocals;
}
maxLocals++;
}
@@ -1402,7 +1405,7 @@ public final class Rest {
} while ((loop = loop.getSuperclass()) != Object.class);
if (!attrParaNames.isEmpty()) { //参数存在 RestHeader、RestCookie、RestSessionid、RestAddress、RestBody字段
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitVarInsn(ALOAD, maxLocals); //加载JsonBean
Label lif = new Label();
mv.visitJumpInsn(IFNULL, lif); //if(bean != null) {
for (Map.Entry<String, Object[]> en : attrParaNames.entrySet()) {
@@ -1410,12 +1413,22 @@ public final class Rest {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, en.getKey(), attrDesc);
mv.visitVarInsn(ALOAD, maxLocals);
boolean upload = en.getKey().contains("_upload");
mv.visitVarInsn(ALOAD, upload ? (maxLocals - 1) : 1);
mv.visitVarInsn(ALOAD, en.getKey().contains("_upload") ? uploadLocal : 1);
if (en.getKey().contains("_header_")) {
mv.visitLdcInsn(en.getValue()[0].toString());
mv.visitLdcInsn("");
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getHeader", "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;", false);
String headerkey = en.getValue()[0].toString();
if ("Host".equalsIgnoreCase(headerkey)) {
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getHost", "()Ljava/lang/String;", false);
} else if ("Content-Type".equalsIgnoreCase(headerkey)) {
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getContentType", "()Ljava/lang/String;", false);
} else if ("Connection".equalsIgnoreCase(headerkey)) {
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getConnection", "()Ljava/lang/String;", false);
} else if ("Method".equalsIgnoreCase(headerkey)) {
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getMethod", "()Ljava/lang/String;", false);
} else {
mv.visitLdcInsn(headerkey);
mv.visitLdcInsn("");
mv.visitMethodInsn(INVOKEVIRTUAL, reqInternalName, "getHeader", "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;", false);
}
} else if (en.getKey().contains("_cookie_")) {
mv.visitLdcInsn(en.getValue()[0].toString());
mv.visitLdcInsn("");

View File

@@ -349,6 +349,7 @@ public abstract class WebSocketNode {
*/
public final CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) {
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendMessage(message, last, userids);
@@ -406,6 +407,7 @@ public abstract class WebSocketNode {
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final Convert convert, final Object message0, final boolean last) {
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(convert, msg, last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastMessage(message, last);

View File

@@ -54,7 +54,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteBuffer buffer = context.pollBuffer();
final ByteBuffer buffer = pollWriteReadBuffer();
fillHeader(buffer, 0, retcode);
finish(buffer);
return;

View File

@@ -667,6 +667,39 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return (Collection<String>) getAndRefresh(key, expireSeconds);
}
@Override
public boolean existsSetItem(final String key, final V value) {
Collection<V> list = getCollection(key);
return list != null && list.contains(value);
}
@Override
public CompletableFuture<Boolean> existsSetItemAsync(final String key, final V value) {
return CompletableFuture.supplyAsync(() -> existsSetItem(key, value), getExecutor());
}
@Override
public boolean existsStringSetItem(final String key, final String value) {
Collection<String> list = getStringCollection(key);
return list != null && list.contains(value);
}
@Override
public CompletableFuture<Boolean> existsStringSetItemAsync(final String key, final String value) {
return CompletableFuture.supplyAsync(() -> existsStringSetItem(key, value), getExecutor());
}
@Override
public boolean existsLongSetItem(final String key, final long value) {
Collection<Long> list = getLongCollection(key);
return list != null && list.contains(value);
}
@Override
public CompletableFuture<Boolean> existsLongSetItemAsync(final String key, final long value) {
return CompletableFuture.supplyAsync(() -> existsLongSetItem(key, value), getExecutor());
}
@Override
@RpcMultiRun
public Collection<Long> getLongCollectionAndRefresh(final String key, final int expireSeconds) {

View File

@@ -66,6 +66,8 @@ public interface CacheSource<V extends Object> {
public void removeListItem(final String key, final V value);
public boolean existsSetItem(final String key, final V value);
public void appendSetItem(final String key, final V value);
public void removeSetItem(final String key, final V value);
@@ -92,6 +94,8 @@ public interface CacheSource<V extends Object> {
public void removeStringListItem(final String key, final String value);
public boolean existsStringSetItem(final String key, final String value);
public void appendStringSetItem(final String key, final String value);
public void removeStringSetItem(final String key, final String value);
@@ -112,6 +116,8 @@ public interface CacheSource<V extends Object> {
public void removeLongListItem(final String key, final long value);
public boolean existsLongSetItem(final String key, final long value);
public void appendLongSetItem(final String key, final long value);
public void removeLongSetItem(final String key, final long value);
@@ -151,6 +157,8 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> removeListItemAsync(final String key, final V value);
public CompletableFuture<Boolean> existsSetItemAsync(final String key, final V value);
public CompletableFuture<Void> appendSetItemAsync(final String key, final V value);
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value);
@@ -177,6 +185,8 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value);
public CompletableFuture<Boolean> existsStringSetItemAsync(final String key, final String value);
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value);
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value);
@@ -197,6 +207,8 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> removeLongListItemAsync(final String key, final long value);
public CompletableFuture<Boolean> existsLongSetItemAsync(final String key, final long value);
public CompletableFuture<Void> appendLongSetItemAsync(final String key, final long value);
public CompletableFuture<Void> removeLongSetItemAsync(final String key, final long value);

View File

@@ -109,27 +109,33 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
}
@Override
@Local
public void close() throws Exception {
readPool.close();
writePool.close();
}
@Local
public PoolJdbcSource getReadPoolJdbcSource() {
return readPool;
}
@Local
public PoolJdbcSource getWritePoolJdbcSource() {
return writePool;
}
@Local
public Connection createReadSQLConnection() {
return readPool.poll();
}
@Local
public <T> Connection createWriteSQLConnection() {
return writePool.poll();
}
@Local
public void closeSQLConnection(final Connection sqlconn) {
if (sqlconn == null) return;
try {
@@ -140,6 +146,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
}
@Override
@Local
public EntityInfo apply(Class t) {
return loadEntityInfo(t);
}
@@ -972,9 +979,8 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
}
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
//注LIMIT 仅支持MySQL 且在多表关联式会异常, 该BUG尚未解决
sql += info.createSQLOrderby(flipper) + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))))
+ info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
conn.setReadOnly(false);
if (blobs != null) {
@@ -2369,7 +2375,8 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
*
* @return 结果数组
*/
public final int[] directExecute(String... sqls) {
@Local
public int[] directExecute(String... sqls) {
Connection conn = createWriteSQLConnection();
try {
return directExecute(conn, sqls);
@@ -2385,7 +2392,8 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
* @param sql SQL语句
* @param consumer 回调函数
*/
public final void directQuery(String sql, Consumer<ResultSet> consumer) {
@Local
public void directQuery(String sql, Consumer<ResultSet> consumer) {
final Connection conn = createReadSQLConnection();
try {
if (logger.isLoggable(Level.FINEST)) logger.finest("direct query sql=" + sql);

View File

@@ -300,7 +300,7 @@ public interface DataSource {
* 更新符合过滤条件的记录的指定字段 <br>
* Flipper中offset字段将被忽略 <br>
* <b>注意</b>Entity类中标记为&#064;Column(updatable=false)不会被更新 <br>
* 等价SQL: UPDATE {table} SET {column1} = {value1}, {column2} += {value2}, {column3} *= {value3}, &#183;&#183;&#183; WHERE {filter node} ORDER BY {flipper.sort} LIMIT {flipper.limit} <br>
* 等价SQL: UPDATE {table} SET {column1} = {value1}, {column2} += {value2}, {column3} *= {value3}, &#183;&#183;&#183; WHERE {filter node} ORDER BY {flipper.sort} <br>
*
* @param <T> Entity泛型
* @param clazz Entity类
@@ -316,7 +316,7 @@ public interface DataSource {
* 更新符合过滤条件的记录的指定字段 <br>
* Flipper中offset字段将被忽略 <br>
* <b>注意</b>Entity类中标记为&#064;Column(updatable=false)不会被更新 <br>
* 等价SQL: UPDATE {table} SET {column1} = {value1}, {column2} += {value2}, {column3} *= {value3}, &#183;&#183;&#183; WHERE {filter node} ORDER BY {flipper.sort} LIMIT {flipper.limit} <br>
* 等价SQL: UPDATE {table} SET {column1} = {value1}, {column2} += {value2}, {column3} *= {value3}, &#183;&#183;&#183; WHERE {filter node} ORDER BY {flipper.sort} <br>
*
* @param <T> Entity泛型
* @param clazz Entity类

View File

@@ -67,6 +67,10 @@ public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
this.creator = creator;
}
public Predicate<T> getRecyclerPredicate() {
return recycler;
}
@Override
public T get() {
T result = queue.poll();

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.8.8";
return "1.8.9";
}
public static int getMajorVersion() {

View File

@@ -60,6 +60,11 @@ public class HelloService implements Service {
if (source != null) source.update(entity);
}
//修改记录
public void update2Hello(@RestAddress String clientAddr, @RestUploadFile byte[] fs) { //通过 /pipes/hello/update2?bean={...} 修改对象
System.out.println("修改记录2-" + nodeid + ": clientAddr = " + clientAddr + ", fs =" + fs);
}
//修改记录
@RestMapping(name = "partupdate")
public void updateHello(HelloEntity entity, @RestParam(name = "cols") String[] columns) { //通过 /pipes/hello/partupdate?bean={...}&cols=... 修改对象

View File

@@ -14,11 +14,11 @@ public class SimpleRestServlet extends HttpServlet {
protected static final RetResult RET_AUTHILLEGAL = RetCodes.retResult(RetCodes.RET_USER_AUTH_ILLEGAL);
@Resource
private UserService userService;
private UserService userService = new UserService();
@Override
public void preExecute(HttpRequest request, HttpResponse response) throws IOException {
final String sessionid = request.getSessionid(false);
final String sessionid = request.getSessionid(true);
if (sessionid != null) request.setCurrentUser(userService.current(sessionid));
response.nextEvent();
}

View File

@@ -16,6 +16,6 @@ public class UserService implements Service {
//根据登录态获取当前用户信息
public UserInfo current(String sessionid) {
return null;
return new UserInfo();
}
}

View File

@@ -38,6 +38,8 @@ public class _DynHelloRestServlet1 extends SimpleRestServlet {
//headers.put(Rest.REST_HEADER_RESOURCE_NAME, "my-res");
String url = "http://127.0.0.1:" + port + "/pipes/hello/update?entity={}&bean2={}";
System.out.println(Utility.postHttpContent(url, headers, null));
url = "http://127.0.0.1:" + port + "/pipes/hello/update2?entity={}&bean2={}";
System.out.println(Utility.postHttpContent(url, headers, null));
url = "http://127.0.0.1:" + port + "/pipes/hello/asyncfind/1234";
System.out.println("异步查找: " + Utility.postHttpContent(url, headers, null));