Compare commits
62 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b3658143a | ||
|
|
6d69ff546b | ||
|
|
9555e3c9b9 | ||
|
|
744634dbdd | ||
|
|
de5ee844c4 | ||
|
|
ae73cee357 | ||
|
|
d1cf9be8d7 | ||
|
|
43ae77ab33 | ||
|
|
182a75cfad | ||
|
|
222dc0edce | ||
|
|
c7a81513fe | ||
|
|
4931c66868 | ||
|
|
fcff1c3a4b | ||
|
|
2005bf7e3b | ||
|
|
cb07a38f04 | ||
|
|
6085cd5eef | ||
|
|
086275c135 | ||
|
|
a449a96ef9 | ||
|
|
bc3209a09c | ||
|
|
63d1ef985d | ||
|
|
24505564c8 | ||
|
|
93a7bd63cf | ||
|
|
0b9b5baa49 | ||
|
|
5c4100e762 | ||
|
|
eb861014c4 | ||
|
|
e62f7ea63d | ||
|
|
d6df2055b2 | ||
|
|
570aac947a | ||
|
|
1fdc33b565 | ||
|
|
af8d0e978e | ||
|
|
c58022a81e | ||
|
|
f4cf828993 | ||
|
|
c4dc0de5fe | ||
|
|
44507a97a6 | ||
|
|
f4a7f1cff6 | ||
|
|
a5fcb45a88 | ||
|
|
bc8b68526d | ||
|
|
180f201dc0 | ||
|
|
9ab315a405 | ||
|
|
27b4742b6d | ||
|
|
702220d18e | ||
|
|
414489da8e | ||
|
|
77057df25d | ||
|
|
2f98cd1ab5 | ||
|
|
8809fe8ec9 | ||
|
|
f9702a9517 | ||
|
|
29e46b9b68 | ||
|
|
f838e35413 | ||
|
|
f3bb77c49b | ||
|
|
12fa033e15 | ||
|
|
f4abfafea2 | ||
|
|
0918af71d2 | ||
|
|
275befa330 | ||
|
|
ab4cd8bcb6 | ||
|
|
36c109b32f | ||
|
|
73a915665d | ||
|
|
bd6d71c94a | ||
|
|
842e93507c | ||
|
|
76df1108d7 | ||
|
|
941d09cde2 | ||
|
|
9dd3e1da07 | ||
|
|
2bf73245ec |
@@ -22,3 +22,7 @@
|
||||
由于RedKale使用了JDK 8 内置的ASM包,所以需要在源码工程中的编译器选项中加入: <b>-XDignore.symbol.file=true</b>
|
||||
|
||||
<h5>详情请访问: <a href='https://redkale.org' target='_blank'>https://redkale.org</a></h5>
|
||||
|
||||
<h5>基本文档: <a href='https://redkale.org/articles.html' target='_blank'>https://redkale.org/articles.html</a></h5>
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ export LC_ALL="zh_CN.UTF-8"
|
||||
|
||||
APP_HOME=`dirname "$0"`
|
||||
|
||||
if [ ! -a "$APP_HOME"/conf/application.xml ]; then
|
||||
if [ ! -f "$APP_HOME"/conf/application.xml ]; then
|
||||
APP_HOME="$APP_HOME"/..
|
||||
fi
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ export LC_ALL="zh_CN.UTF-8"
|
||||
|
||||
APP_HOME=`dirname "$0"`
|
||||
|
||||
if [ ! -a "$APP_HOME"/conf/application.xml ]; then
|
||||
if [ ! -f "$APP_HOME"/conf/application.xml ]; then
|
||||
APP_HOME="$APP_HOME"/..
|
||||
fi
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ export LC_ALL="zh_CN.UTF-8"
|
||||
|
||||
APP_HOME=`dirname "$0"`
|
||||
|
||||
if [ ! -a "$APP_HOME"/conf/application.xml ]; then
|
||||
if [ ! -f "$APP_HOME"/conf/application.xml ]; then
|
||||
APP_HOME="$APP_HOME"/..
|
||||
fi
|
||||
|
||||
|
||||
@@ -10,6 +10,16 @@
|
||||
|
||||
<server protocol="HTTP" host="0.0.0.0" port="6060" root="root">
|
||||
|
||||
<request>
|
||||
<remoteaddr value="request.headers.X-RemoteAddress"/>
|
||||
</request>
|
||||
|
||||
<response>
|
||||
<defcookie domain="" path=""/>
|
||||
<addheader name="Access-Control-Allow-Origin" value="request.headers.Origin" />
|
||||
<setheader name="Access-Control-Allow-Credentials" value="true"/>
|
||||
</response>
|
||||
|
||||
<services autoload="true"/>
|
||||
|
||||
<filters autoload="true"/>
|
||||
|
||||
@@ -18,7 +18,7 @@ java.util.logging.FileHandler.limit = 10485760
|
||||
java.util.logging.FileHandler.count = 10000
|
||||
java.util.logging.FileHandler.encoding = UTF-8
|
||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
|
||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
|
||||
java.util.logging.FileHandler.append = true
|
||||
|
||||
java.util.logging.ConsoleHandler.level = FINER
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
<persistence-unit name="user.read" transaction-type="RESOURCE_LOCAL">
|
||||
<shared-cache-mode>ALL</shared-cache-mode>
|
||||
<properties>
|
||||
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://localhost:3306/user?autoReconnect=true&characterEncoding=utf8"/>
|
||||
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
|
||||
<property name="javax.persistence.jdbc.user" value="root"/>
|
||||
<property name="javax.persistence.jdbc.url" value="jdbc:oracle:thin:@localhost:1521:orcl"/>
|
||||
<property name="javax.persistence.jdbc.driver" value="oracle.jdbc.driver.OracleDriver"/>
|
||||
<property name="javax.persistence.jdbc.user" value="system"/>
|
||||
<property name="javax.persistence.jdbc.password" value="1234"/>
|
||||
</properties>
|
||||
</persistence-unit>
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
threads: 线程总数, 默认: <group>节点数*CPU核数*8
|
||||
bufferCapacity: ByteBuffer的初始化大小, 默认: 8K;
|
||||
bufferPoolSize: ByteBuffer池的大小,默认: <group>节点数*CPU核数*8
|
||||
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
|
||||
-->
|
||||
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
|
||||
|
||||
@@ -210,11 +211,14 @@
|
||||
如果addheader、setheader 的value值以request.parameters.开头则表示从request.parameters中获取对应的parameter值
|
||||
如果addheader、setheader 的value值以request.headers.开头则表示从request.headers中获取对应的header值
|
||||
例如下面例子是在Response输出header时添加两个header(一个addHeader, 一个setHeader)。
|
||||
options 节点: 设置了该节点却auto=true,当request的method=OPTIONS自动设置addheader、setheader并返回200状态码
|
||||
-->
|
||||
<response>
|
||||
<defcookie domain="" path=""/>
|
||||
<defcookie domain="" path=""/>
|
||||
<addheader name="Access-Control-Allow-Origin" value="request.headers.Origin" />
|
||||
<setheader name="Access-Control-Allow-Credentials" value="true"/>
|
||||
<setheader name="Access-Control-Allow-Headers" value="request.headers.Access-Control-Request-Headers"/>
|
||||
<setheader name="Access-Control-Allow-Credentials" value="true"/>
|
||||
<options auto="true" />
|
||||
</response>
|
||||
|
||||
<!--
|
||||
|
||||
@@ -15,9 +15,9 @@ com.sun.level = INFO
|
||||
java.util.logging.FileHandler.limit = 10485760
|
||||
java.util.logging.FileHandler.count = 100
|
||||
java.util.logging.FileHandler.encoding = UTF-8
|
||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%u.log
|
||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
||||
#java.util.logging.FileHandler.unusual \u5c5e\u6027\u8868\u793a\u5c06 WARNING\u3001SEVERE \u7ea7\u522b\u7684\u65e5\u5fd7\u590d\u5236\u5199\u5165\u5355\u72ec\u7684\u6587\u4ef6\u4e2d
|
||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
|
||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
|
||||
java.util.logging.FileHandler.append = true
|
||||
|
||||
#java.util.logging.ConsoleHandler.level = FINE
|
||||
|
||||
@@ -3,13 +3,15 @@
|
||||
<persistence>
|
||||
<!-- 系统基本库 -->
|
||||
<persistence-unit name="demouser">
|
||||
<!-- 为NONE表示不启动缓存,@Cacheable 失效; 非NONE值(通常用ALL)表示开启缓存。 -->
|
||||
<shared-cache-mode>NONE</shared-cache-mode>
|
||||
<properties>
|
||||
<!--
|
||||
DataSource的实现类,没有设置默认为org.redkale.source.DataJdbcSource的实现,使用常规基于JDBC的数据库驱动一般无需设置
|
||||
-->
|
||||
<property name="javax.persistence.datasource" value="org.redkale.source.DataJdbcSource"/>
|
||||
<!--
|
||||
是否开启缓存(标记为@Cacheable的Entity类),值目前只支持两种: ALL: 所有开启缓存。 NONE: 关闭所有缓存
|
||||
-->
|
||||
<property name="javax.persistence.cachemode" value="ALL"/>
|
||||
|
||||
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbuser?characterEncoding=utf8"/>
|
||||
<!--
|
||||
@@ -42,7 +44,6 @@
|
||||
</persistence-unit>
|
||||
<!-- IM消息库 -->
|
||||
<persistence-unit name="demoim">
|
||||
<shared-cache-mode>NONE</shared-cache-mode>
|
||||
<properties>
|
||||
<!-- jdbc:mysql://127.0.0.1:3306/dbim?autoReconnect=true&autoReconnectForPools=true&characterEncoding=utf8 -->
|
||||
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbim?characterEncoding=utf8"/>
|
||||
|
||||
@@ -232,12 +232,14 @@ public final class Application {
|
||||
}
|
||||
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
||||
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
|
||||
//------------------配置 <transport> 节点 ------------------
|
||||
ObjectPool<ByteBuffer> transportPool = null;
|
||||
ExecutorService transportExec = null;
|
||||
AsynchronousChannelGroup transportGroup = null;
|
||||
final AnyValue resources = config.getAnyValue("resources");
|
||||
TransportStrategy strategy = null;
|
||||
if (resources != null) {
|
||||
AnyValue transportConf = resources.getAnyValue("transport");
|
||||
int groupsize = resources.getAnyValues("group").length;
|
||||
@@ -246,9 +248,9 @@ public final class Application {
|
||||
//--------------transportBufferPool-----------
|
||||
AtomicLong createBufferCounter = new AtomicLong();
|
||||
AtomicLong cycleBufferCounter = new AtomicLong();
|
||||
final int bufferCapacity = transportConf.getIntValue("bufferCapacity", 8 * 1024);
|
||||
final int bufferPoolSize = transportConf.getIntValue("bufferPoolSize", groupsize * Runtime.getRuntime().availableProcessors() * 8);
|
||||
final int threads = transportConf.getIntValue("threads", groupsize * Runtime.getRuntime().availableProcessors() * 8);
|
||||
final int bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), 8 * 1024), 4 * 1024);
|
||||
final int bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
|
||||
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
|
||||
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
|
||||
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
|
||||
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
|
||||
@@ -257,6 +259,10 @@ public final class Application {
|
||||
});
|
||||
//-----------transportChannelGroup--------------
|
||||
try {
|
||||
final String strategyClass = transportConf.getValue("strategy");
|
||||
if (strategyClass != null && !strategyClass.isEmpty()) {
|
||||
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance();
|
||||
}
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
|
||||
Thread t = new Thread(r);
|
||||
@@ -271,8 +277,7 @@ public final class Application {
|
||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||
}
|
||||
}
|
||||
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
|
||||
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy);
|
||||
Thread.currentThread().setContextClassLoader(this.classLoader);
|
||||
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
||||
}
|
||||
@@ -379,7 +384,7 @@ public final class Application {
|
||||
try {
|
||||
Resource res = field.getAnnotation(Resource.class);
|
||||
if (res == null) return;
|
||||
if (!(src instanceof WatchService) || Sncp.isRemote((Service) src)) return; //远程模式不得注入
|
||||
if (Sncp.isRemote((Service) src)) return; //远程模式不得注入
|
||||
Class type = field.getType();
|
||||
if (type == Application.class) {
|
||||
field.set(src, application);
|
||||
@@ -431,7 +436,7 @@ public final class Application {
|
||||
return false;
|
||||
}
|
||||
|
||||
}, Application.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
||||
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
||||
//--------------------------------------------------------------------------
|
||||
initResources();
|
||||
}
|
||||
@@ -758,6 +763,15 @@ public final class Application {
|
||||
this.transportFactory.shutdownNow();
|
||||
}
|
||||
|
||||
private static int parseLenth(String value, int defValue) {
|
||||
if (value == null) return defValue;
|
||||
value = value.toUpperCase().replace("B", "");
|
||||
if (value.endsWith("G")) return Integer.decode(value.replace("G", "")) * 1024 * 1024 * 1024;
|
||||
if (value.endsWith("M")) return Integer.decode(value.replace("M", "")) * 1024 * 1024;
|
||||
if (value.endsWith("K")) return Integer.decode(value.replace("K", "")) * 1024;
|
||||
return Integer.decode(value);
|
||||
}
|
||||
|
||||
private static AnyValue load(final InputStream in0) {
|
||||
final DefaultAnyValue any = new DefaultAnyValue();
|
||||
try (final InputStream in = in0) {
|
||||
|
||||
@@ -99,7 +99,11 @@ public final class ClassFilter<T> {
|
||||
* @return Set<FilterEntry<T>>
|
||||
*/
|
||||
public final Set<FilterEntry<T>> getFilterEntrys() {
|
||||
return entrys;
|
||||
HashSet<FilterEntry<T>> set = new HashSet<>();
|
||||
set.addAll(entrys);
|
||||
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterEntrys()));
|
||||
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterEntrys()));
|
||||
return set;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -108,7 +112,11 @@ public final class ClassFilter<T> {
|
||||
* @return Set<FilterEntry<T>>
|
||||
*/
|
||||
public final Set<FilterEntry<T>> getFilterExpectEntrys() {
|
||||
return expectEntrys;
|
||||
HashSet<FilterEntry<T>> set = new HashSet<>();
|
||||
set.addAll(expectEntrys);
|
||||
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
|
||||
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
|
||||
return set;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -118,8 +126,8 @@ public final class ClassFilter<T> {
|
||||
*/
|
||||
public final Set<FilterEntry<T>> getAllFilterEntrys() {
|
||||
HashSet<FilterEntry<T>> rs = new HashSet<>();
|
||||
rs.addAll(entrys);
|
||||
rs.addAll(expectEntrys);
|
||||
rs.addAll(getFilterEntrys());
|
||||
rs.addAll(getFilterExpectEntrys());
|
||||
return rs;
|
||||
}
|
||||
|
||||
@@ -183,7 +191,7 @@ public final class ClassFilter<T> {
|
||||
} catch (Throwable cfe) {
|
||||
if (finer && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
|
||||
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.")) {
|
||||
//logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
|
||||
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.util.logging.Formatter;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class LogFileHandler extends Handler {
|
||||
|
||||
/**
|
||||
|
||||
@@ -122,6 +122,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
}, WebSocketNode.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void loadHttpFilter(final AnyValue filtersConf, final ClassFilter<? extends Filter> classFilter) throws Exception {
|
||||
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
|
||||
final String threadName = "[" + Thread.currentThread().getName() + "] ";
|
||||
@@ -138,6 +139,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void loadHttpServlet(final ClassFilter<? extends Servlet> servletFilter, ClassFilter<? extends WebSocket> webSocketFilter) throws Exception {
|
||||
final AnyValue servletsConf = this.serverConf.getAnyValue("servlets");
|
||||
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
|
||||
@@ -195,6 +197,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString().trim());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void loadRestServlet(final ClassFilter<? extends WebSocket> webSocketFilter, final AnyValue restConf, final List<Object> restedObjects, final StringBuilder sb) throws Exception {
|
||||
if (!rest) return;
|
||||
if (restConf == null) return; //不存在REST服务
|
||||
|
||||
@@ -56,7 +56,7 @@ public abstract class NodeServer {
|
||||
|
||||
//ClassLoader
|
||||
protected RedkaleClassLoader serverClassLoader;
|
||||
|
||||
|
||||
protected final Thread serverThread;
|
||||
|
||||
//当前Server的SNCP协议的组
|
||||
@@ -173,7 +173,7 @@ public abstract class NodeServer {
|
||||
final TransportFactory appTranFactory = application.getTransportFactory();
|
||||
final AnyValue resources = application.config.getAnyValue("resources");
|
||||
final Map<String, AnyValue> cacheResource = new HashMap<>();
|
||||
//final Map<String, AnyValue> dataResources = new HashMap<>();
|
||||
final Map<String, AnyValue> dataResources = new HashMap<>();
|
||||
if (resources != null) {
|
||||
for (AnyValue sourceConf : resources.getAnyValues("source")) {
|
||||
try {
|
||||
@@ -183,9 +183,7 @@ public abstract class NodeServer {
|
||||
} else if (CacheSource.class.isAssignableFrom(type)) {
|
||||
cacheResource.put(sourceConf.getValue("name", ""), sourceConf);
|
||||
} else if (DataSource.class.isAssignableFrom(type)) {
|
||||
//dataResources.put(sourceConf.getValue("name", ""), sourceConf);
|
||||
//暂时不支持DataSource通过<resources>设置
|
||||
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
|
||||
dataResources.put(sourceConf.getValue("name", ""), sourceConf);
|
||||
} else {
|
||||
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
|
||||
}
|
||||
@@ -258,23 +256,32 @@ public abstract class NodeServer {
|
||||
final Service srcService = (Service) src;
|
||||
SncpClient client = Sncp.getSncpClient(srcService);
|
||||
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
|
||||
final AnyValue sourceConf = cacheResource.get(resourceName);
|
||||
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("type"));
|
||||
final Set<String> groups = new HashSet<>();
|
||||
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
|
||||
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
|
||||
final CacheSource source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
|
||||
Type genericType = field.getGenericType();
|
||||
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
|
||||
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
|
||||
if (sourceType == CacheMemorySource.class) {
|
||||
CacheMemorySource memorySource = (CacheMemorySource) source;
|
||||
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
|
||||
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
|
||||
|
||||
AnyValue sourceConf = cacheResource.get(resourceName);
|
||||
if (sourceConf == null) sourceConf = dataResources.get(resourceName);
|
||||
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
|
||||
Object source;
|
||||
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
|
||||
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
|
||||
application.dataSources.add((DataSource) source);
|
||||
appResFactory.register(resourceName, DataSource.class, source);
|
||||
} else { // CacheSource
|
||||
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
|
||||
Type genericType = field.getGenericType();
|
||||
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
|
||||
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
|
||||
if (sourceType == CacheMemorySource.class) {
|
||||
CacheMemorySource memorySource = (CacheMemorySource) source;
|
||||
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
|
||||
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
|
||||
}
|
||||
application.cacheSources.add((CacheSource) source);
|
||||
appResFactory.register(resourceName, genericType, source);
|
||||
appResFactory.register(resourceName, CacheSource.class, source);
|
||||
}
|
||||
application.cacheSources.add(source);
|
||||
appResFactory.register(resourceName, genericType, source);
|
||||
appResFactory.register(resourceName, CacheSource.class, source);
|
||||
field.set(src, source);
|
||||
rf.inject(source, self); //
|
||||
if (source instanceof Service) ((Service) source).init(sourceConf);
|
||||
@@ -435,7 +442,7 @@ public abstract class NodeServer {
|
||||
}
|
||||
|
||||
protected ClassFilter<Service> createServiceClassFilter() {
|
||||
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() || application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
|
||||
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
|
||||
}
|
||||
|
||||
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,
|
||||
@@ -531,7 +538,7 @@ public abstract class NodeServer {
|
||||
public void setServerClassLoader(RedkaleClassLoader serverClassLoader) {
|
||||
Objects.requireNonNull(this.serverClassLoader);
|
||||
this.serverClassLoader = serverClassLoader;
|
||||
this.serverThread.setContextClassLoader(serverClassLoader);
|
||||
this.serverThread.setContextClassLoader(serverClassLoader);
|
||||
}
|
||||
|
||||
public InetSocketAddress getSncpAddress() {
|
||||
|
||||
@@ -85,6 +85,7 @@ public class NodeSncpServer extends NodeServer {
|
||||
if (sncpServer != null) loadSncpFilter(this.serverConf.getAnyValue("fliters"), filterFilter);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void loadSncpFilter(final AnyValue servletsConf, final ClassFilter<? extends Filter> classFilter) throws Exception {
|
||||
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
|
||||
final String threadName = "[" + Thread.currentThread().getName() + "] ";
|
||||
@@ -106,8 +107,9 @@ public class NodeSncpServer extends NodeServer {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected ClassFilter<Filter> createFilterClassFilter() {
|
||||
return createClassFilter(null, null, SncpFilter.class, null, null, "filters", "filter");
|
||||
return createClassFilter(null, null, SncpFilter.class, new Class[]{org.redkale.watch.WatchFilter.class}, null, "filters", "filter");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -35,6 +35,23 @@ public final class AnyEncoder<T> implements Encodeable<Writer, T> {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void convertMapTo(final Writer out, final Object... values) {
|
||||
if (values == null) {
|
||||
out.writeNull();
|
||||
} else {
|
||||
int count = values.length - values.length % 2;
|
||||
out.writeMapB(count / 2);
|
||||
for (int i = 0; i < count; i += 2) {
|
||||
if (i > 0) out.writeArrayMark();
|
||||
this.convertTo(out, (T) values[i]);
|
||||
out.writeMapMark();
|
||||
this.convertTo(out, (T) values[i + 1]);
|
||||
}
|
||||
out.writeMapE();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Object.class;
|
||||
|
||||
@@ -40,4 +40,7 @@ public abstract class Convert<R extends Reader, W extends Writer> {
|
||||
public abstract ByteBuffer[] convertTo(final Supplier<ByteBuffer> supplier, final Object value);
|
||||
|
||||
public abstract ByteBuffer[] convertTo(final Supplier<ByteBuffer> supplier, final Type type, final Object value);
|
||||
|
||||
public abstract ByteBuffer[] convertMapTo(final Supplier<ByteBuffer> supplier, final Object... values);
|
||||
|
||||
}
|
||||
|
||||
@@ -104,6 +104,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
return convertFrom(type, bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T convertFrom(final Type type, final byte[] bytes, final int start, final int len) {
|
||||
if (type == null) return null;
|
||||
final BsonReader in = readerPool.get();
|
||||
@@ -114,23 +115,27 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
return rs;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T convertFrom(final Type type, final InputStream in) {
|
||||
if (type == null || in == null) return null;
|
||||
return (T) factory.loadDecoder(type).convertFrom(new BsonStreamReader(in));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T convertFrom(final Type type, final ByteBuffer... buffers) {
|
||||
if (type == null || buffers.length < 1) return null;
|
||||
return (T) factory.loadDecoder(type).convertFrom(new BsonByteBufferReader((ConvertMask) null, buffers));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T convertFrom(final Type type, final ConvertMask mask, final ByteBuffer... buffers) {
|
||||
if (type == null || buffers.length < 1) return null;
|
||||
return (T) factory.loadDecoder(type).convertFrom(new BsonByteBufferReader(mask, buffers));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T convertFrom(final Type type, final BsonReader reader) {
|
||||
if (type == null) return null;
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -159,6 +164,15 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
return result;
|
||||
}
|
||||
|
||||
public byte[] convertMapTo(final Object... values) {
|
||||
if (values == null) return null;
|
||||
final BsonWriter out = writerPool.get().tiny(tiny);
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
byte[] result = out.toArray();
|
||||
writerPool.offer(out);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void convertTo(final OutputStream out, final Object value) {
|
||||
if (value == null) {
|
||||
new BsonStreamWriter(tiny, out).writeNull();
|
||||
@@ -176,6 +190,14 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
}
|
||||
}
|
||||
|
||||
public void convertMapTo(final OutputStream out, final Object... values) {
|
||||
if (values == null) {
|
||||
new BsonStreamWriter(tiny, out).writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(new BsonStreamWriter(tiny, out), values);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] convertTo(final Supplier<ByteBuffer> supplier, final Object value) {
|
||||
if (supplier == null) return null;
|
||||
@@ -200,6 +222,18 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
return out.toBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] convertMapTo(final Supplier<ByteBuffer> supplier, final Object... values) {
|
||||
if (supplier == null) return null;
|
||||
BsonByteBufferWriter out = new BsonByteBufferWriter(tiny, supplier);
|
||||
if (values == null) {
|
||||
out.writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
}
|
||||
return out.toBuffers();
|
||||
}
|
||||
|
||||
public void convertTo(final BsonWriter writer, final Object value) {
|
||||
if (value == null) {
|
||||
writer.writeNull();
|
||||
@@ -213,6 +247,14 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
factory.loadEncoder(type).convertTo(writer, value);
|
||||
}
|
||||
|
||||
public void convertMapTo(final BsonWriter writer, final Object... values) {
|
||||
if (values == null) {
|
||||
writer.writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(writer, values);
|
||||
}
|
||||
}
|
||||
|
||||
public BsonWriter convertToWriter(final Object value) {
|
||||
if (value == null) return null;
|
||||
return convertToWriter(value.getClass(), value);
|
||||
@@ -225,4 +267,9 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
|
||||
return out;
|
||||
}
|
||||
|
||||
public BsonWriter convertMapToWriter(final Object... values) {
|
||||
final BsonWriter out = writerPool.get().tiny(tiny);
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ public class BsonReader extends Reader {
|
||||
* 跳过属性的值
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public final void skipValue() {
|
||||
if (typeval == 0) return;
|
||||
final byte val = this.typeval;
|
||||
|
||||
@@ -32,7 +32,7 @@ public final class DateSimpledCoder<R extends Reader, W extends Writer> extends
|
||||
@Override
|
||||
public Date convertFrom(R in) {
|
||||
long t = in.readLong();
|
||||
return t == 0 ? null : new Date();
|
||||
return t == 0 ? null : new Date(t);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.net.*;
|
||||
* @param <R> Reader输入的子类型
|
||||
* @param <W> Writer输出的子类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final class InetAddressSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, InetAddress> {
|
||||
|
||||
public static final InetAddressSimpledCoder instance = new InetAddressSimpledCoder();
|
||||
@@ -50,6 +51,7 @@ public final class InetAddressSimpledCoder<R extends Reader, W extends Writer> e
|
||||
* @param <R> Reader输入的子类型
|
||||
* @param <W> Writer输出的子类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final static class InetSocketAddressSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, InetSocketAddress> {
|
||||
|
||||
public static final InetSocketAddressSimpledCoder instance = new InetSocketAddressSimpledCoder();
|
||||
|
||||
@@ -149,6 +149,15 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
|
||||
return result;
|
||||
}
|
||||
|
||||
public String convertMapTo(final Object... values) {
|
||||
if (values == null) return "null";
|
||||
final JsonWriter out = writerPool.get().tiny(tiny);
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
String result = out.toString();
|
||||
writerPool.offer(out);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void convertTo(final OutputStream out, final Object value) {
|
||||
if (value == null) {
|
||||
new JsonStreamWriter(tiny, out).writeNull();
|
||||
@@ -166,6 +175,14 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
|
||||
}
|
||||
}
|
||||
|
||||
public void convertMapTo(final OutputStream out, final Object... values) {
|
||||
if (values == null) {
|
||||
new JsonStreamWriter(tiny, out).writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(new JsonStreamWriter(tiny, out), values);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] convertTo(final Supplier<ByteBuffer> supplier, final Object value) {
|
||||
if (supplier == null) return null;
|
||||
@@ -190,6 +207,18 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
|
||||
return out.toBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] convertMapTo(final Supplier<ByteBuffer> supplier, final Object... values) {
|
||||
if (supplier == null) return null;
|
||||
JsonByteBufferWriter out = new JsonByteBufferWriter(tiny, null, supplier);
|
||||
if (values == null) {
|
||||
out.writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
}
|
||||
return out.toBuffers();
|
||||
}
|
||||
|
||||
public void convertTo(final JsonWriter writer, final Object value) {
|
||||
if (value == null) {
|
||||
writer.writeNull();
|
||||
@@ -207,6 +236,14 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
|
||||
}
|
||||
}
|
||||
|
||||
public void convertMapTo(final JsonWriter writer, final Object... values) {
|
||||
if (values == null) {
|
||||
writer.writeNull();
|
||||
} else {
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(writer, values);
|
||||
}
|
||||
}
|
||||
|
||||
public JsonWriter convertToWriter(final Object value) {
|
||||
if (value == null) return null;
|
||||
return convertToWriter(value.getClass(), value);
|
||||
@@ -218,4 +255,10 @@ public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
|
||||
factory.loadEncoder(type).convertTo(out, value);
|
||||
return out;
|
||||
}
|
||||
|
||||
public JsonWriter convertMapToWriter(final Object... values) {
|
||||
final JsonWriter out = writerPool.get().tiny(tiny);
|
||||
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import org.redkale.util.*;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final class JsonFactory extends ConvertFactory<JsonReader, JsonWriter> {
|
||||
|
||||
private static final JsonFactory instance = new JsonFactory(null, Boolean.getBoolean("convert.json.tiny"));
|
||||
|
||||
@@ -118,6 +118,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init(C context, AnyValue config) {
|
||||
synchronized (filters) {
|
||||
if (!filters.isEmpty()) {
|
||||
@@ -136,6 +137,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void destroy(C context, AnyValue config) {
|
||||
synchronized (filters) {
|
||||
if (!filters.isEmpty()) {
|
||||
@@ -146,6 +148,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void addFilter(Filter<C, R, P> filter, AnyValue conf) {
|
||||
filter._conf = conf;
|
||||
synchronized (filters) {
|
||||
@@ -176,6 +179,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
return false;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Filter<C, R, P>> T removeFilter(Predicate<T> predicate) {
|
||||
if (this.headFilter == null || predicate == null) return null;
|
||||
synchronized (filters) {
|
||||
@@ -198,10 +202,12 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Filter<C, R, P>> List<T> getFilters() {
|
||||
return (List) new ArrayList<>(filters);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public abstract void addServlet(S servlet, Object attachment, AnyValue conf, K... mappings);
|
||||
|
||||
public final void prepare(final ByteBuffer buffer, final R request, final P response) throws IOException {
|
||||
|
||||
@@ -51,6 +51,7 @@ public abstract class Request<C extends Context> {
|
||||
* 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。
|
||||
*
|
||||
* @param buffer ByteBuffer对象
|
||||
*
|
||||
* @return 缺少的字节数
|
||||
*/
|
||||
protected abstract int readHeader(ByteBuffer buffer);
|
||||
@@ -59,6 +60,7 @@ public abstract class Request<C extends Context> {
|
||||
* 读取buffer,并返回读取的有效数据长度
|
||||
*
|
||||
* @param buffer ByteBuffer对象
|
||||
*
|
||||
* @return 有效数据长度
|
||||
*/
|
||||
protected abstract int readBody(ByteBuffer buffer);
|
||||
@@ -82,8 +84,9 @@ public abstract class Request<C extends Context> {
|
||||
return (T) properties.get(name);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T removeProperty(String name) {
|
||||
return (T)properties.remove(name);
|
||||
return (T) properties.remove(name);
|
||||
}
|
||||
|
||||
protected Map<String, Object> getProperties() {
|
||||
@@ -100,8 +103,9 @@ public abstract class Request<C extends Context> {
|
||||
return (T) attributes.get(name);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T removeAttribute(String name) {
|
||||
return (T)attributes.remove(name);
|
||||
return (T) attributes.remove(name);
|
||||
}
|
||||
|
||||
public Map<String, Object> getAttributes() {
|
||||
|
||||
@@ -9,6 +9,7 @@ import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.logging.Level;
|
||||
|
||||
/**
|
||||
* 协议响应对象
|
||||
@@ -113,8 +114,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
try {
|
||||
recycleListener.accept(request, this);
|
||||
} catch (Exception e) {
|
||||
System.err.println(request);
|
||||
e.printStackTrace();
|
||||
context.logger.log(Level.WARNING, "Response.recycleListener error, request = " + request, e);
|
||||
}
|
||||
recycleListener = null;
|
||||
}
|
||||
|
||||
@@ -99,11 +99,11 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
||||
this.config = config;
|
||||
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
|
||||
this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
|
||||
this.backlog = config.getIntValue("backlog", 8 * 1024);
|
||||
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
|
||||
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
|
||||
this.maxbody = config.getIntValue("maxbody", 64 * 1024);
|
||||
int bufCapacity = config.getIntValue("bufferCapacity", 8 * 1024);
|
||||
this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024);
|
||||
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
|
||||
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), 8 * 1024);
|
||||
this.bufferCapacity = bufCapacity < 256 ? 256 : bufCapacity;
|
||||
this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 16);
|
||||
this.bufferPoolSize = config.getIntValue("bufferPoolSize", Runtime.getRuntime().availableProcessors() * 512);
|
||||
@@ -120,6 +120,24 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
||||
});
|
||||
}
|
||||
|
||||
protected static int parseLenth(String value, int defValue) {
|
||||
if (value == null) return defValue;
|
||||
value = value.toUpperCase().replace("B", "");
|
||||
if (value.endsWith("G")) return Integer.decode(value.replace("G", "")) * 1024 * 1024 * 1024;
|
||||
if (value.endsWith("M")) return Integer.decode(value.replace("M", "")) * 1024 * 1024;
|
||||
if (value.endsWith("K")) return Integer.decode(value.replace("K", "")) * 1024;
|
||||
return Integer.decode(value);
|
||||
}
|
||||
|
||||
protected static long parseLenth(String value, long defValue) {
|
||||
if (value == null) return defValue;
|
||||
value = value.toUpperCase().replace("B", "");
|
||||
if (value.endsWith("G")) return Long.decode(value.replace("G", "")) * 1024 * 1024 * 1024;
|
||||
if (value.endsWith("M")) return Long.decode(value.replace("M", "")) * 1024 * 1024;
|
||||
if (value.endsWith("K")) return Long.decode(value.replace("K", "")) * 1024;
|
||||
return Long.decode(value);
|
||||
}
|
||||
|
||||
public void destroy(final AnyValue config) throws Exception {
|
||||
this.prepare.destroy(context, config);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Supplier;
|
||||
import org.redkale.convert.*;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
@@ -52,19 +54,24 @@ public final class Transport {
|
||||
|
||||
protected final InetSocketAddress clientAddress;
|
||||
|
||||
protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0];
|
||||
protected TransportAddress[] transportAddres = new TransportAddress[0];
|
||||
|
||||
protected final ObjectPool<ByteBuffer> bufferPool;
|
||||
|
||||
//负载均衡策略
|
||||
protected final TransportStrategy strategy;
|
||||
|
||||
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
|
||||
|
||||
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
||||
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses);
|
||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy);
|
||||
}
|
||||
|
||||
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||
this.name = name;
|
||||
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
|
||||
this.protocol = protocol;
|
||||
@@ -72,32 +79,38 @@ public final class Transport {
|
||||
this.group = transportChannelGroup;
|
||||
this.bufferPool = transportBufferPool;
|
||||
this.clientAddress = clientAddress;
|
||||
this.strategy = strategy;
|
||||
updateRemoteAddresses(addresses);
|
||||
}
|
||||
|
||||
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
|
||||
InetSocketAddress[] oldAddresses = this.remoteAddres;
|
||||
List<InetSocketAddress> list = new ArrayList<>();
|
||||
TransportAddress[] oldAddresses = this.transportAddres;
|
||||
List<TransportAddress> list = new ArrayList<>();
|
||||
if (addresses != null) {
|
||||
for (InetSocketAddress addr : addresses) {
|
||||
if (clientAddress != null && clientAddress.equals(addr)) continue;
|
||||
list.add(addr);
|
||||
list.add(new TransportAddress(addr));
|
||||
}
|
||||
}
|
||||
this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]);
|
||||
return oldAddresses;
|
||||
this.transportAddres = list.toArray(new TransportAddress[list.size()]);
|
||||
|
||||
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
|
||||
for (int i = 0; i < rs.length; i++) {
|
||||
rs[i] = oldAddresses[i].getAddress();
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
|
||||
if (addr == null) return false;
|
||||
synchronized (this) {
|
||||
if (this.remoteAddres == null) {
|
||||
this.remoteAddres = new InetSocketAddress[]{addr};
|
||||
if (this.transportAddres == null) {
|
||||
this.transportAddres = new TransportAddress[]{new TransportAddress(addr)};
|
||||
} else {
|
||||
for (InetSocketAddress i : this.remoteAddres) {
|
||||
if (addr.equals(i)) return false;
|
||||
for (TransportAddress i : this.transportAddres) {
|
||||
if (addr.equals(i.address)) return false;
|
||||
}
|
||||
this.remoteAddres = Utility.append(remoteAddres, addr);
|
||||
this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -105,9 +118,9 @@ public final class Transport {
|
||||
|
||||
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
|
||||
if (addr == null) return false;
|
||||
if (this.remoteAddres == null) return false;
|
||||
if (this.transportAddres == null) return false;
|
||||
synchronized (this) {
|
||||
this.remoteAddres = Utility.remove(remoteAddres, addr);
|
||||
this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -128,13 +141,25 @@ public final class Transport {
|
||||
return clientAddress;
|
||||
}
|
||||
|
||||
public TransportAddress[] getTransportAddresses() {
|
||||
return transportAddres;
|
||||
}
|
||||
|
||||
public InetSocketAddress[] getRemoteAddresses() {
|
||||
return remoteAddres;
|
||||
InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length];
|
||||
for (int i = 0; i < rs.length; i++) {
|
||||
rs[i] = transportAddres[i].getAddress();
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
|
||||
return connPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
|
||||
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}";
|
||||
}
|
||||
|
||||
public ByteBuffer pollBuffer() {
|
||||
@@ -158,32 +183,57 @@ public final class Transport {
|
||||
}
|
||||
|
||||
public AsyncConnection pollConnection(SocketAddress addr) {
|
||||
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
|
||||
if (this.strategy != null) return strategy.pollConnection(addr, this);
|
||||
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
|
||||
final boolean rand = addr == null;
|
||||
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
||||
if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
||||
try {
|
||||
if (tcp) {
|
||||
AsynchronousSocketChannel channel = null;
|
||||
if (rand) { //取地址
|
||||
for (int i = 0; i < remoteAddres.length; i++) {
|
||||
addr = remoteAddres[i];
|
||||
BlockingQueue<AsyncConnection> queue = connPool.get(addr);
|
||||
if (queue != null && !queue.isEmpty()) {
|
||||
TransportAddress transportAddr;
|
||||
boolean tryed = false;
|
||||
for (int i = 0; i < transportAddres.length; i++) {
|
||||
transportAddr = transportAddres[i];
|
||||
addr = transportAddr.address;
|
||||
if (!transportAddr.enable) continue;
|
||||
final BlockingQueue<AsyncConnection> queue = transportAddr.conns;
|
||||
if (!queue.isEmpty()) {
|
||||
AsyncConnection conn;
|
||||
while ((conn = queue.poll()) != null) {
|
||||
if (conn.isOpen()) return conn;
|
||||
}
|
||||
}
|
||||
tryed = true;
|
||||
if (channel == null) {
|
||||
channel = AsynchronousSocketChannel.open(group);
|
||||
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
}
|
||||
try {
|
||||
channel.connect(addr).get(2, TimeUnit.SECONDS);
|
||||
transportAddr.enable = true;
|
||||
break;
|
||||
} catch (Exception iex) {
|
||||
iex.printStackTrace();
|
||||
if (i == remoteAddres.length - 1) channel = null;
|
||||
transportAddr.enable = false;
|
||||
channel = null;
|
||||
}
|
||||
}
|
||||
if (channel == null && !tryed) {
|
||||
for (int i = 0; i < transportAddres.length; i++) {
|
||||
transportAddr = transportAddres[i];
|
||||
addr = transportAddr.address;
|
||||
if (channel == null) {
|
||||
channel = AsynchronousSocketChannel.open(group);
|
||||
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
}
|
||||
try {
|
||||
channel.connect(addr).get(2, TimeUnit.SECONDS);
|
||||
transportAddr.enable = true;
|
||||
break;
|
||||
} catch (Exception iex) {
|
||||
transportAddr.enable = false;
|
||||
channel = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -194,7 +244,7 @@ public final class Transport {
|
||||
if (channel == null) return null;
|
||||
return AsyncConnection.create(channel, addr, 3000, 3000);
|
||||
} else { // UDP
|
||||
if (rand) addr = remoteAddres[0];
|
||||
if (rand) addr = this.transportAddres[0].address;
|
||||
DatagramChannel channel = DatagramChannel.open();
|
||||
channel.configureBlocking(true);
|
||||
channel.connect(addr);
|
||||
@@ -256,4 +306,54 @@ public final class Transport {
|
||||
});
|
||||
}
|
||||
|
||||
public static class TransportAddress {
|
||||
|
||||
protected InetSocketAddress address;
|
||||
|
||||
protected volatile boolean enable;
|
||||
|
||||
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
|
||||
|
||||
public TransportAddress(InetSocketAddress address) {
|
||||
this.address = address;
|
||||
this.enable = true;
|
||||
}
|
||||
|
||||
@java.beans.ConstructorProperties({"address", "enable"})
|
||||
public TransportAddress(InetSocketAddress address, boolean enable) {
|
||||
this.address = address;
|
||||
this.enable = enable;
|
||||
}
|
||||
|
||||
public InetSocketAddress getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public boolean isEnable() {
|
||||
return enable;
|
||||
}
|
||||
|
||||
@ConvertColumn(ignore = true)
|
||||
public BlockingQueue<AsyncConnection> getConns() {
|
||||
return conns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.address.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null) return false;
|
||||
if (getClass() != obj.getClass()) return false;
|
||||
final TransportAddress other = (TransportAddress) obj;
|
||||
return this.address.equals(other.address);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return JsonConvert.root().convertTo(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,10 +43,19 @@ public class TransportFactory {
|
||||
|
||||
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
||||
|
||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||
//负载均衡策略
|
||||
protected final TransportStrategy strategy;
|
||||
|
||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||
final TransportStrategy strategy) {
|
||||
this.executor = executor;
|
||||
this.bufferPool = bufferPool;
|
||||
this.channelGroup = channelGroup;
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||
this(executor, bufferPool, channelGroup, null);
|
||||
}
|
||||
|
||||
public String findGroupName(InetSocketAddress addr) {
|
||||
@@ -127,14 +136,14 @@ public class TransportFactory {
|
||||
}
|
||||
if (info == null) return null;
|
||||
if (sncpAddress != null) addresses.remove(sncpAddress);
|
||||
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
|
||||
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
|
||||
}
|
||||
|
||||
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
||||
if (groupName == null) return null;
|
||||
TransportGroupInfo info = groupInfos.get(groupName);
|
||||
if (info == null) return null;
|
||||
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses);
|
||||
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy);
|
||||
}
|
||||
|
||||
public ExecutorService getExecutor() {
|
||||
|
||||
21
src/org/redkale/net/TransportStrategy.java
Normal file
21
src/org/redkale/net/TransportStrategy.java
Normal file
@@ -0,0 +1,21 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.net;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* 远程请求的负载均衡策略
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public interface TransportStrategy {
|
||||
|
||||
public AsyncConnection pollConnection(SocketAddress addr, Transport transport);
|
||||
}
|
||||
@@ -53,6 +53,7 @@ public class HttpContext extends Context {
|
||||
return responsePool;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <H extends AsyncHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
|
||||
Creator<H> creator = asyncHandlerCreators.get(handlerClass);
|
||||
if (creator == null) {
|
||||
@@ -62,6 +63,7 @@ public class HttpContext extends Context {
|
||||
return creator;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <H extends AsyncHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
|
||||
//生成规则与SncpAsyncHandler.Factory 很类似
|
||||
//-------------------------------------------------------------
|
||||
|
||||
@@ -235,6 +235,10 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
|
||||
try {
|
||||
final String uri = request.getRequestURI();
|
||||
HttpServlet servlet;
|
||||
if (response.isAutoOptions() && "OPTIONS".equals(request.getMethod())) {
|
||||
response.finish(200, null);
|
||||
return;
|
||||
}
|
||||
if (request.isWebSocket()) {
|
||||
servlet = wsmappings.get(uri);
|
||||
if (servlet == null && this.regWsArray != null) {
|
||||
|
||||
@@ -32,7 +32,7 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
|
||||
protected static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
|
||||
protected static final String SESSIONID_NAME = "JSESSIONID";
|
||||
public static final String SESSIONID_NAME = "JSESSIONID";
|
||||
|
||||
@Comment("Method GET/POST/...")
|
||||
private String method;
|
||||
@@ -121,7 +121,6 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
} else {
|
||||
this.requestURI = array.toDecodeString(index, offset - index, charset).trim();
|
||||
}
|
||||
if (this.requestURI.contains("../")) return -1;
|
||||
index = ++offset;
|
||||
this.protocol = array.toString(index, array.size() - index, charset).trim();
|
||||
while (readLine(buffer, array)) {
|
||||
@@ -889,6 +888,15 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
/**
|
||||
* 获取请求Header总对象
|
||||
*
|
||||
* @return AnyValue
|
||||
*/
|
||||
public AnyValue getHeaders() {
|
||||
return header;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有的header名
|
||||
*
|
||||
@@ -1087,6 +1095,16 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
/**
|
||||
* 获取请求参数总对象
|
||||
*
|
||||
* @return AnyValue
|
||||
*/
|
||||
public AnyValue getParameters() {
|
||||
parseBody();
|
||||
return params;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有参数名
|
||||
*
|
||||
|
||||
@@ -192,6 +192,11 @@ public class HttpResourceServlet extends HttpServlet {
|
||||
@Override
|
||||
public void execute(HttpRequest request, HttpResponse response) throws IOException {
|
||||
String uri = request.getRequestURI();
|
||||
if (uri.contains("../")) {
|
||||
if (finest) logger.log(Level.FINEST, "Not found resource (404) be " + uri + ", request = " + request);
|
||||
response.finish404();
|
||||
return;
|
||||
}
|
||||
if (locationRewrites != null) {
|
||||
for (SimpleEntry<Pattern, String> entry : locationRewrites) {
|
||||
Matcher matcher = entry.getKey().matcher(uri);
|
||||
|
||||
@@ -42,7 +42,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
|
||||
}
|
||||
|
||||
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
||||
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
||||
|
||||
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
||||
|
||||
@@ -125,17 +125,21 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
|
||||
private final String[][] defaultSetHeaders;
|
||||
|
||||
private final boolean autoOptions;
|
||||
|
||||
private final HttpCookie defcookie;
|
||||
|
||||
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
|
||||
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
|
||||
}
|
||||
|
||||
public HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) {
|
||||
public HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders,
|
||||
HttpCookie defcookie, boolean autoOptions) {
|
||||
super(context, request);
|
||||
this.defaultAddHeaders = defaultAddHeaders;
|
||||
this.defaultSetHeaders = defaultSetHeaders;
|
||||
this.defcookie = defcookie;
|
||||
this.autoOptions = autoOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -145,6 +149,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
|
||||
@Override
|
||||
protected boolean recycle() {
|
||||
boolean rs = super.recycle();
|
||||
this.status = 200;
|
||||
this.contentLength = -1;
|
||||
this.contentType = null;
|
||||
@@ -152,7 +157,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
this.headsended = false;
|
||||
this.header.clear();
|
||||
this.bufferHandler = null;
|
||||
return super.recycle();
|
||||
return rs;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -181,10 +186,15 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void thenEvent(Servlet servlet) {
|
||||
this.servlet = servlet;
|
||||
}
|
||||
|
||||
protected boolean isAutoOptions() {
|
||||
return this.autoOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加Cookie值
|
||||
*
|
||||
@@ -239,6 +249,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
*
|
||||
* @return AsyncHandler AsyncHandler
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass) {
|
||||
if (handlerClass == null || handlerClass == AsyncHandler.class) return (H) createAsyncHandler();
|
||||
return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler());
|
||||
@@ -255,6 +266,18 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
finish(request.getJsonConvert().convertTo(context.getBufferSupplier(), obj));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将对象数组用Map的形式以JSON格式输出 <br>
|
||||
* 例如: finishMap("a",2,"b",3) 输出结果为 {"a":2,"b":3}
|
||||
*
|
||||
* @param objs 输出对象
|
||||
*/
|
||||
public void finishMapJson(final Object... objs) {
|
||||
this.contentType = "text/plain; charset=utf-8";
|
||||
if (this.recycleListener != null) this.output = objs;
|
||||
finish(request.getJsonConvert().convertMapTo(context.getBufferSupplier(), objs));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将对象以JSON格式输出
|
||||
*
|
||||
@@ -267,6 +290,19 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
finish(convert.convertTo(context.getBufferSupplier(), obj));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将对象数组用Map的形式以JSON格式输出 <br>
|
||||
* 例如: finishMap("a",2,"b",3) 输出结果为 {"a":2,"b":3}
|
||||
*
|
||||
* @param convert 指定的JsonConvert
|
||||
* @param objs 输出对象
|
||||
*/
|
||||
public void finishMapJson(final JsonConvert convert, final Object... objs) {
|
||||
this.contentType = "text/plain; charset=utf-8";
|
||||
if (this.recycleListener != null) this.output = objs;
|
||||
finish(convert.convertMapTo(context.getBufferSupplier(), objs));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将对象以JSON格式输出
|
||||
*
|
||||
@@ -349,6 +385,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
* @param convert 指定的JsonConvert
|
||||
* @param future 输出对象的句柄
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void finishJson(final JsonConvert convert, final CompletableFuture future) {
|
||||
future.whenComplete((v, e) -> {
|
||||
if (e != null) {
|
||||
@@ -373,6 +410,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
* @param type 指定的类型
|
||||
* @param future 输出对象的句柄
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future) {
|
||||
future.whenComplete((v, e) -> {
|
||||
if (e != null) {
|
||||
@@ -407,6 +445,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
* @param convert 指定的JsonConvert
|
||||
* @param result HttpResult对象
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void finishJson(final JsonConvert convert, final HttpResult result) {
|
||||
if (output == null) {
|
||||
finish("");
|
||||
@@ -439,6 +478,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
if (isClosed()) return;
|
||||
if (this.recycleListener != null) this.output = obj;
|
||||
if (obj == null || obj.isEmpty()) {
|
||||
this.contentLength = 0;
|
||||
final ByteBuffer headbuf = createHeader();
|
||||
headbuf.flip();
|
||||
super.finish(headbuf);
|
||||
@@ -704,8 +744,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
final String match = request.getHeader("If-None-Match");
|
||||
final String etag = (file == null ? 0L : file.lastModified()) + "-" + length;
|
||||
if (match != null && etag.equals(match)) {
|
||||
finish304();
|
||||
return;
|
||||
//finish304();
|
||||
//return;
|
||||
}
|
||||
this.contentLength = length;
|
||||
if (filename != null && !filename.isEmpty() && file != null) {
|
||||
@@ -733,18 +773,20 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
ByteBuffer hbuffer = createHeader();
|
||||
hbuffer.flip();
|
||||
if (fileBody == null) {
|
||||
if (this.recycleListener != null) this.output = file;
|
||||
finishFile(hbuffer, file, start, len);
|
||||
} else {
|
||||
if (start >= 0) {
|
||||
fileBody.position((int) start);
|
||||
if (len > 0) fileBody.limit((int) (fileBody.position() + len));
|
||||
}
|
||||
if (this.recycleListener != null) this.output = fileBody;
|
||||
super.finish(hbuffer, fileBody);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException {
|
||||
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
|
||||
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length));
|
||||
}
|
||||
|
||||
private ByteBuffer createHeader() {
|
||||
@@ -754,7 +796,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
|
||||
buffer.put(("Content-Type: " + (this.contentType == null ? "text/plain; charset=utf-8" : this.contentType) + "\r\n").getBytes());
|
||||
|
||||
if (this.contentLength > 0) {
|
||||
if (this.contentLength >= 0) {
|
||||
buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
|
||||
}
|
||||
if (!this.request.isKeepAlive()) {
|
||||
@@ -976,54 +1018,76 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
||||
|
||||
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> {
|
||||
|
||||
private final File file;
|
||||
|
||||
private final AsynchronousFileChannel filechannel;
|
||||
|
||||
private final long max; //需要读取的字节数, -1表示读到文件结尾
|
||||
|
||||
private long count;//读取文件的字节数
|
||||
|
||||
private long position = 0;
|
||||
private long readpos = 0;
|
||||
|
||||
private boolean next = false;
|
||||
private boolean hdwrite = true; //写入Header
|
||||
|
||||
private boolean read = true;
|
||||
private boolean read = false;
|
||||
|
||||
public TransferFileHandler(AsynchronousFileChannel channel) {
|
||||
this.filechannel = channel;
|
||||
this.max = -1;
|
||||
public TransferFileHandler(File file) throws IOException {
|
||||
this.file = file;
|
||||
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
|
||||
this.readpos = 0;
|
||||
this.max = file.length();
|
||||
}
|
||||
|
||||
public TransferFileHandler(AsynchronousFileChannel channel, long offset, long len) {
|
||||
this.filechannel = channel;
|
||||
this.position = offset <= 0 ? 0 : offset;
|
||||
this.max = len;
|
||||
public TransferFileHandler(File file, long offset, long len) throws IOException {
|
||||
this.file = file;
|
||||
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
|
||||
this.readpos = offset <= 0 ? 0 : offset;
|
||||
this.max = len <= 0 ? file.length() : len;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, ByteBuffer attachment) {
|
||||
if (result < 0 || (max > 0 && count >= max)) {
|
||||
//(Utility.now() + "---" + Thread.currentThread().getName() + "-----------" + file + "-------------------result: " + result + ", max = " + max + ", readpos = " + readpos + ", count = " + count + ", " + (hdwrite ? "正在写Header" : (read ? "准备读" : "准备写")));
|
||||
if (result < 0 || count >= max) {
|
||||
failed(null, attachment);
|
||||
return;
|
||||
}
|
||||
if (hdwrite && attachment.hasRemaining()) { //Header还没写完
|
||||
channel.write(attachment, attachment, this);
|
||||
return;
|
||||
}
|
||||
if (hdwrite) {
|
||||
//(Utility.now() + "---" + Thread.currentThread().getName() + "-----------" + file + "-------------------Header写入完毕, 准备读取文件.");
|
||||
hdwrite = false;
|
||||
read = true;
|
||||
result = 0;
|
||||
}
|
||||
if (read) {
|
||||
count += result;
|
||||
} else {
|
||||
readpos += result;
|
||||
}
|
||||
if (read && attachment.hasRemaining()) { //Buffer还没写完
|
||||
channel.write(attachment, attachment, this);
|
||||
return;
|
||||
}
|
||||
|
||||
if (read) {
|
||||
read = false;
|
||||
if (next) {
|
||||
position += result;
|
||||
} else {
|
||||
next = true;
|
||||
}
|
||||
attachment.clear();
|
||||
filechannel.read(attachment, position, attachment, this);
|
||||
filechannel.read(attachment, readpos, attachment, this);
|
||||
} else {
|
||||
read = true;
|
||||
if (max > 0) {
|
||||
count += result;
|
||||
if (count > max) {
|
||||
attachment.limit((int) (attachment.position() + max - count));
|
||||
}
|
||||
if (count > max) {
|
||||
attachment.limit((int) (attachment.position() + max - count));
|
||||
}
|
||||
attachment.flip();
|
||||
channel.write(attachment, attachment, this);
|
||||
if (attachment.hasRemaining()) {
|
||||
channel.write(attachment, attachment, this);
|
||||
} else {
|
||||
failed(null, attachment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import java.net.HttpCookie;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.net.sncp.Sncp;
|
||||
import org.redkale.service.Service;
|
||||
@@ -216,6 +217,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
*
|
||||
* @return RestServlet
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <S extends Service, T extends HttpServlet> T addRestServlet(final ClassLoader classLoader, final String name, final S service, final Class userType, final Class<T> baseServletType, final String prefix) {
|
||||
T servlet = null;
|
||||
final boolean sncp = Sncp.isSncpDyn(service);
|
||||
@@ -231,8 +233,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
break;
|
||||
}
|
||||
} catch (NoSuchFieldException | SecurityException e) {
|
||||
System.err.println("serviceType = " + serviceType + ", servletClass = " + item.getClass());
|
||||
e.printStackTrace();
|
||||
logger.log(Level.SEVERE, "serviceType = " + serviceType + ", servletClass = " + item.getClass(), e);
|
||||
}
|
||||
}
|
||||
final boolean first = servlet == null;
|
||||
@@ -280,6 +281,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
});
|
||||
final List<String[]> defaultAddHeaders = new ArrayList<>();
|
||||
final List<String[]> defaultSetHeaders = new ArrayList<>();
|
||||
boolean autoOptions = false;
|
||||
|
||||
HttpCookie defaultCookie = null;
|
||||
String remoteAddrHeader = null;
|
||||
if (config != null) {
|
||||
@@ -342,10 +345,15 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
defaultCookie.setPath(path);
|
||||
}
|
||||
}
|
||||
AnyValue options = resps == null ? null : resps.getAnyValue("options");
|
||||
autoOptions = options != null && options.getBoolValue("auto", false);
|
||||
}
|
||||
|
||||
}
|
||||
final String[][] addHeaders = defaultAddHeaders.isEmpty() ? null : defaultAddHeaders.toArray(new String[defaultAddHeaders.size()][]);
|
||||
final String[][] setHeaders = defaultSetHeaders.isEmpty() ? null : defaultSetHeaders.toArray(new String[defaultSetHeaders.size()][]);
|
||||
final boolean options = autoOptions;
|
||||
|
||||
final HttpCookie defCookie = defaultCookie;
|
||||
final String addrHeader = remoteAddrHeader;
|
||||
AtomicLong createResponseCounter = new AtomicLong();
|
||||
@@ -353,7 +361,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
|
||||
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
|
||||
this.maxbody, this.charset, this.address, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond);
|
||||
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), addHeaders, setHeaders, defCookie));
|
||||
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), addHeaders, setHeaders, defCookie, options));
|
||||
return httpcontext;
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void preInit(HttpContext context, AnyValue config) {
|
||||
String path = _prefix == null ? "" : _prefix;
|
||||
WebServlet ws = this.getClass().getAnnotation(WebServlet.class);
|
||||
|
||||
@@ -92,7 +92,16 @@ public final class Rest {
|
||||
return new MethodVisitor(Opcodes.ASM5) {
|
||||
@Override
|
||||
public void visitLocalVariable(String name, String description, String signature, Label start, Label end, int index) {
|
||||
if (index > 0) fieldnames.add(name);
|
||||
if (index < 1) return;
|
||||
int size = fieldnames.size();
|
||||
//index并不会按顺序执行的
|
||||
if (index > size) {
|
||||
for (int i = size; i < index; i++) {
|
||||
fieldnames.add(" ");
|
||||
}
|
||||
fieldnames.set(index - 1, name);
|
||||
}
|
||||
fieldnames.set(index - 1, name);
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -121,6 +130,7 @@ public final class Rest {
|
||||
childFactory.register(rc.type(), true, rc.ignoreColumns());
|
||||
childFactory.reloadCoder(rc.type());
|
||||
types.add(rc.type());
|
||||
childFactory.tiny(rc.tiny());
|
||||
}
|
||||
return childFactory.getConvert();
|
||||
}
|
||||
@@ -328,6 +338,13 @@ public final class Rest {
|
||||
mv.visitMaxs(2, 1);
|
||||
mv.visitEnd();
|
||||
}
|
||||
{ //resourceName
|
||||
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "resourceName", "()Ljava/lang/String;", null, null));
|
||||
mv.visitLdcInsn(rws.name());
|
||||
mv.visitInsn(ARETURN);
|
||||
mv.visitMaxs(1, 1);
|
||||
mv.visitEnd();
|
||||
}
|
||||
|
||||
RestClassLoader newLoader = new RestClassLoader(loader);
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
@Repeatable(RestConvert.RestConverts.class)
|
||||
public @interface RestConvert {
|
||||
|
||||
boolean tiny() default true;
|
||||
|
||||
Class type();
|
||||
|
||||
String[] ignoreColumns() default {};
|
||||
|
||||
@@ -12,6 +12,7 @@ import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.logging.*;
|
||||
import java.util.stream.Stream;
|
||||
import org.redkale.convert.Convert;
|
||||
import org.redkale.util.Comment;
|
||||
@@ -82,6 +83,8 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
|
||||
private long createtime = System.currentTimeMillis();
|
||||
|
||||
private long pingtime;
|
||||
|
||||
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
|
||||
|
||||
protected WebSocket() {
|
||||
@@ -89,11 +92,13 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
|
||||
//----------------------------------------------------------------
|
||||
public final CompletableFuture<Integer> sendPing() {
|
||||
this.pingtime = System.currentTimeMillis();
|
||||
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
|
||||
return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET);
|
||||
}
|
||||
|
||||
public final CompletableFuture<Integer> sendPing(byte[] data) {
|
||||
this.pingtime = System.currentTimeMillis();
|
||||
return sendPacket(new WebSocketPacket(FrameType.PING, data));
|
||||
}
|
||||
|
||||
@@ -478,6 +483,25 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
public void onClose(int code, String reason) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常时调用
|
||||
*
|
||||
* @param t 异常
|
||||
* @param buffers ByteBuffer[]
|
||||
*/
|
||||
public void onOccurException(Throwable t, ByteBuffer[] buffers) {
|
||||
this.getLogger().log(Level.SEVERE, "WebSocket receive or send Message error", t);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取Logger
|
||||
*
|
||||
* @return Logger Logger
|
||||
*/
|
||||
public Logger getLogger() {
|
||||
return this._engine.logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一次发送消息的时间
|
||||
*
|
||||
@@ -487,6 +511,15 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
return this._runner == null ? 0 : this._runner.lastSendTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一次发送PING消息的时间
|
||||
*
|
||||
* @return long
|
||||
*/
|
||||
public long getLastPingTime() {
|
||||
return this.pingtime;
|
||||
}
|
||||
|
||||
/**
|
||||
* 显式地关闭WebSocket
|
||||
*/
|
||||
|
||||
@@ -10,6 +10,7 @@ import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.logging.*;
|
||||
import java.util.stream.*;
|
||||
import org.redkale.convert.Convert;
|
||||
@@ -23,18 +24,18 @@ import org.redkale.util.*;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public final class WebSocketEngine {
|
||||
public class WebSocketEngine {
|
||||
|
||||
//全局自增长ID
|
||||
@Comment("全局自增长ID, 为了确保在一个进程里多个WebSocketEngine定时发送ping时不会同时进行")
|
||||
private static final AtomicInteger sequence = new AtomicInteger();
|
||||
|
||||
//Engine自增长序号ID
|
||||
@Comment("Engine自增长序号ID")
|
||||
private final int index;
|
||||
|
||||
//当前WebSocket对应的Engine
|
||||
@Comment("当前WebSocket对应的Engine")
|
||||
private final String engineid;
|
||||
|
||||
//当前WebSocket对应的Node
|
||||
@Comment("当前WebSocket对应的Node")
|
||||
protected final WebSocketNode node;
|
||||
|
||||
//HttpContext
|
||||
@@ -43,23 +44,25 @@ public final class WebSocketEngine {
|
||||
//Convert
|
||||
protected final Convert sendConvert;
|
||||
|
||||
protected final boolean single; //是否单用户单连接
|
||||
@Comment("是否单用户单连接")
|
||||
protected final boolean single;
|
||||
|
||||
//在线用户ID对应的WebSocket组,用于单用户单连接模式
|
||||
@Comment("在线用户ID对应的WebSocket组,用于单用户单连接模式")
|
||||
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
|
||||
|
||||
//在线用户ID对应的WebSocket组,用于单用户多连接模式
|
||||
@Comment("在线用户ID对应的WebSocket组,用于单用户多连接模式")
|
||||
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
|
||||
|
||||
//用于PING的定时器
|
||||
@Comment("用于PING的定时器")
|
||||
private ScheduledThreadPoolExecutor scheduler;
|
||||
|
||||
//日志
|
||||
@Comment("日志")
|
||||
protected final Logger logger;
|
||||
|
||||
//FINEST日志级别
|
||||
@Comment("日志级别")
|
||||
protected final boolean finest;
|
||||
|
||||
@Comment("PING的间隔秒数")
|
||||
private int liveinterval;
|
||||
|
||||
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
|
||||
@@ -70,8 +73,8 @@ public final class WebSocketEngine {
|
||||
this.node = node;
|
||||
this.liveinterval = liveinterval;
|
||||
this.logger = logger;
|
||||
this.index = sequence.getAndIncrement();
|
||||
this.finest = logger.isLoggable(Level.FINEST);
|
||||
this.index = sequence.getAndIncrement();
|
||||
}
|
||||
|
||||
void init(AnyValue conf) {
|
||||
@@ -96,6 +99,7 @@ public final class WebSocketEngine {
|
||||
if (scheduler != null) scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
@Comment("添加WebSocket")
|
||||
void add(WebSocket socket) {
|
||||
if (single) {
|
||||
websockets.put(socket._userid, socket);
|
||||
@@ -110,6 +114,7 @@ public final class WebSocketEngine {
|
||||
if (node != null) node.connect(socket._userid);
|
||||
}
|
||||
|
||||
@Comment("从WebSocketEngine删除指定WebSocket")
|
||||
void remove(WebSocket socket) {
|
||||
Serializable userid = socket._userid;
|
||||
if (single) {
|
||||
@@ -127,9 +132,15 @@ public final class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
|
||||
@Comment("给所有连接用户发送消息")
|
||||
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
||||
return broadcastMessage(null, message, last);
|
||||
}
|
||||
|
||||
@Comment("给指定WebSocket连接用户发送消息")
|
||||
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
|
||||
if (message instanceof CompletableFuture) {
|
||||
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last));
|
||||
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last));
|
||||
}
|
||||
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
|
||||
if (more) {
|
||||
@@ -140,11 +151,13 @@ public final class WebSocketEngine {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (WebSocket websocket : websockets.values()) {
|
||||
if (predicate != null && !predicate.test(websocket)) continue;
|
||||
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
} else {
|
||||
for (List<WebSocket> list : websockets2.values()) {
|
||||
for (WebSocket websocket : list) {
|
||||
if (predicate != null && !predicate.test(websocket)) continue;
|
||||
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
}
|
||||
@@ -155,11 +168,13 @@ public final class WebSocketEngine {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (WebSocket websocket : websockets.values()) {
|
||||
if (predicate != null && !predicate.test(websocket)) continue;
|
||||
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
} else {
|
||||
for (List<WebSocket> list : websockets2.values()) {
|
||||
for (WebSocket websocket : list) {
|
||||
if (predicate != null && !predicate.test(websocket)) continue;
|
||||
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
||||
}
|
||||
}
|
||||
@@ -168,6 +183,7 @@ public final class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送消息")
|
||||
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||
if (message instanceof CompletableFuture) {
|
||||
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
|
||||
@@ -217,21 +233,33 @@ public final class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
|
||||
Collection<WebSocket> getLocalWebSockets() {
|
||||
@Comment("获取所有连接")
|
||||
public Collection<WebSocket> getLocalWebSockets() {
|
||||
if (single) return websockets.values();
|
||||
List<WebSocket> list = new ArrayList<>();
|
||||
websockets2.values().forEach(x -> list.addAll(x));
|
||||
return list;
|
||||
}
|
||||
|
||||
//适用于单用户单连接模式
|
||||
@Comment("获取当前连接总数")
|
||||
public int getLocalWebSocketSize() {
|
||||
if (single) return websockets.size();
|
||||
return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count();
|
||||
}
|
||||
|
||||
@Comment("获取当前用户总数")
|
||||
public int getLocalUserSize() {
|
||||
return single ? websockets.size() : websockets2.size();
|
||||
}
|
||||
|
||||
@Comment("适用于单用户单连接模式")
|
||||
public WebSocket findLocalWebSocket(Serializable userid) {
|
||||
if (single) return websockets.get(userid);
|
||||
List<WebSocket> list = websockets2.get(userid);
|
||||
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
|
||||
}
|
||||
|
||||
//适用于单用户多连接模式
|
||||
@Comment("适用于单用户多连接模式")
|
||||
public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
|
||||
if (single) {
|
||||
WebSocket websocket = websockets.get(userid);
|
||||
|
||||
@@ -26,6 +26,12 @@ import org.redkale.util.*;
|
||||
*/
|
||||
public abstract class WebSocketNode {
|
||||
|
||||
@Comment("存储当前SNCP节点列表的key")
|
||||
public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes";
|
||||
|
||||
@Comment("存储当前用户数量的key")
|
||||
public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount";
|
||||
|
||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
|
||||
protected final boolean finest = logger.isLoggable(Level.FINEST);
|
||||
@@ -57,7 +63,9 @@ public abstract class WebSocketNode {
|
||||
if (this.localEngine == null) return;
|
||||
//关掉所有本地本地WebSocket
|
||||
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
|
||||
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
|
||||
if (sncpNodeAddresses != null && localSncpAddress != null) {
|
||||
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
|
||||
@@ -140,7 +148,55 @@ public abstract class WebSocketNode {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
* @param userid Serializable
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
|
||||
}
|
||||
return this.sncpNodeAddresses.existsAsync(userid);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取在线用户总数
|
||||
*
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public CompletableFuture<Integer> getUserSize() {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
||||
}
|
||||
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> {
|
||||
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count);
|
||||
});
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
/**
|
||||
* 获取本地的WebSocketEngine,没有则返回null
|
||||
*
|
||||
*
|
||||
* @return WebSocketEngine
|
||||
*/
|
||||
public final WebSocketEngine getLocalWebSocketEngine() {
|
||||
return this.localEngine;
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param userids Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
|
||||
return sendMessage(message, true, userids);
|
||||
}
|
||||
@@ -155,7 +211,6 @@ public abstract class WebSocketNode {
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
//最近连接发送逻辑还没有理清楚
|
||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
|
||||
@@ -112,7 +112,7 @@ public final class WebSocketPacket {
|
||||
ByteBuffer[] duplicateSendBuffers() {
|
||||
ByteBuffer[] rs = new ByteBuffer[this.sendBuffers.length];
|
||||
for (int i = 0; i < this.sendBuffers.length; i++) {
|
||||
rs[i] = this.sendBuffers[i].duplicate();
|
||||
rs[i] = this.sendBuffers[i].duplicate().asReadOnlyBuffer(); //必须使用asReadOnlyBuffer, 否则会导致ByteBuffer对应的byte[]被ObjectPool回收两次
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.logging.*;
|
||||
import org.redkale.convert.Convert;
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
/**
|
||||
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
|
||||
@@ -99,8 +100,17 @@ class WebSocketRunner implements Runnable {
|
||||
}
|
||||
|
||||
try {
|
||||
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
|
||||
|
||||
WebSocketPacket packet;
|
||||
try {
|
||||
packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
|
||||
} catch (Exception e) { //接收的消息体解析失败
|
||||
webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
|
||||
if (readBuffer != null) {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (packet == null) {
|
||||
failed(null, attachment1);
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||
@@ -121,7 +131,17 @@ class WebSocketRunner implements Runnable {
|
||||
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
||||
}
|
||||
} else {
|
||||
Object message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||
Object message;
|
||||
try {
|
||||
message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||
} catch (Exception e) { //接收的消息体解析失败
|
||||
webSocket.onOccurException(e, packet.receiveBuffers);
|
||||
if (readBuffer != null) {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (readBuffer != null) {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
@@ -150,7 +170,17 @@ class WebSocketRunner implements Runnable {
|
||||
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
||||
}
|
||||
} else {
|
||||
Object message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||
Object message;
|
||||
try {
|
||||
message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||
} catch (Exception e) { //接收的消息体解析失败
|
||||
webSocket.onOccurException(e, packet.receiveBuffers);
|
||||
if (readBuffer != null) {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (readBuffer != null) {
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
|
||||
@@ -68,7 +68,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
@Override
|
||||
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr));
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr));
|
||||
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
||||
return future;
|
||||
}
|
||||
|
||||
@@ -348,13 +348,13 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCollectionSize(final K key) {
|
||||
public int getCollectionSize(final K key) {
|
||||
Collection<V> collection = (Collection<V>) get(key);
|
||||
return collection == null ? 0 : collection.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> getCollectionSizeAsync(final K key) {
|
||||
public CompletableFuture<Integer> getCollectionSizeAsync(final K key) {
|
||||
return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor());
|
||||
}
|
||||
|
||||
@@ -443,6 +443,11 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
||||
return new ArrayList<>(container.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getKeySize() {
|
||||
return container.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync() {
|
||||
return CompletableFuture.completedFuture(new ArrayList<>(container.values()));
|
||||
@@ -458,4 +463,8 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
||||
return CompletableFuture.completedFuture(new ArrayList<>(container.keySet()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> getKeySizeAsync() {
|
||||
return CompletableFuture.completedFuture(container.size());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
||||
|
||||
public Collection<V> getCollection(final K key);
|
||||
|
||||
public long getCollectionSize(final K key);
|
||||
public int getCollectionSize(final K key);
|
||||
|
||||
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
|
||||
|
||||
@@ -59,6 +59,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
||||
|
||||
public List<K> queryKeys();
|
||||
|
||||
public int getKeySize();
|
||||
|
||||
public List<CacheEntry<K, Object>> queryList();
|
||||
|
||||
//---------------------- CompletableFuture 异步版 ---------------------------------
|
||||
@@ -80,7 +82,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
||||
|
||||
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
|
||||
|
||||
public CompletableFuture<Long> getCollectionSizeAsync(final K key);
|
||||
public CompletableFuture<Integer> getCollectionSizeAsync(final K key);
|
||||
|
||||
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
|
||||
|
||||
@@ -94,6 +96,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
||||
|
||||
public CompletableFuture<List<K>> queryKeysAsync();
|
||||
|
||||
public CompletableFuture<Integer> getKeySizeAsync();
|
||||
|
||||
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync();
|
||||
|
||||
default CompletableFuture<Boolean> isOpenAsync() {
|
||||
|
||||
@@ -15,6 +15,7 @@ import java.util.function.*;
|
||||
import java.util.logging.*;
|
||||
import javax.annotation.Resource;
|
||||
import org.redkale.service.*;
|
||||
import static org.redkale.source.DataSources.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
@@ -29,35 +30,35 @@ import org.redkale.util.*;
|
||||
@AutoLoad(false)
|
||||
@SuppressWarnings("unchecked")
|
||||
@ResourceType(DataSource.class)
|
||||
public class DataJdbcSource extends AbstractService implements DataSource, Service, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
|
||||
public class DataJdbcSource extends AbstractService implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
|
||||
|
||||
private static final Flipper FLIPPER_ONE = new Flipper(1);
|
||||
protected static final Flipper FLIPPER_ONE = new Flipper(1);
|
||||
|
||||
final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
|
||||
protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
|
||||
|
||||
final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
|
||||
protected final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
|
||||
|
||||
final String name;
|
||||
protected final String name;
|
||||
|
||||
final URL conf;
|
||||
protected final URL conf;
|
||||
|
||||
final boolean cacheForbidden;
|
||||
protected final boolean cacheForbidden;
|
||||
|
||||
private final PoolJdbcSource readPool;
|
||||
protected final PoolJdbcSource readPool;
|
||||
|
||||
private final PoolJdbcSource writePool;
|
||||
protected final PoolJdbcSource writePool;
|
||||
|
||||
@Resource(name = "$")
|
||||
private DataCacheListener cacheListener;
|
||||
protected DataCacheListener cacheListener;
|
||||
|
||||
private final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
|
||||
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
|
||||
|
||||
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
|
||||
this.name = unitName;
|
||||
this.conf = null;
|
||||
this.readPool = new PoolJdbcSource(this, "read", readprop);
|
||||
this.writePool = new PoolJdbcSource(this, "write", writeprop);
|
||||
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty("shared-cache-mode"));
|
||||
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -71,15 +72,15 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
writePool.close();
|
||||
}
|
||||
|
||||
private Connection createReadSQLConnection() {
|
||||
protected Connection createReadSQLConnection() {
|
||||
return readPool.poll();
|
||||
}
|
||||
|
||||
private <T> Connection createWriteSQLConnection() {
|
||||
protected <T> Connection createWriteSQLConnection() {
|
||||
return writePool.poll();
|
||||
}
|
||||
|
||||
private void closeSQLConnection(final Connection sqlconn) {
|
||||
protected void closeSQLConnection(final Connection sqlconn) {
|
||||
if (sqlconn == null) return;
|
||||
try {
|
||||
sqlconn.close();
|
||||
@@ -93,7 +94,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return loadEntityInfo(t);
|
||||
}
|
||||
|
||||
private <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
|
||||
protected <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
|
||||
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
|
||||
}
|
||||
|
||||
@@ -150,7 +151,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.runAsync(() -> insert(values), getExecutor());
|
||||
}
|
||||
|
||||
private <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
protected <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
if (values.length == 0) return;
|
||||
try {
|
||||
if (!info.isVirtualEntity()) {
|
||||
@@ -251,7 +252,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
}
|
||||
}
|
||||
|
||||
private <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
|
||||
protected <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
|
||||
final EntityInfo<T> info, T... values) throws SQLException {
|
||||
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
||||
final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
|
||||
@@ -328,7 +329,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
protected <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
if (values.length == 0) return -1;
|
||||
final Attribute primary = info.getPrimary();
|
||||
Serializable[] ids = new Serializable[values.length];
|
||||
@@ -358,7 +359,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
|
||||
protected <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
|
||||
if (keys.length == 0) return -1;
|
||||
int c = -1;
|
||||
int c2 = 0;
|
||||
@@ -430,7 +431,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
|
||||
protected <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
|
||||
int c = -1;
|
||||
try {
|
||||
if (!info.isVirtualEntity()) {
|
||||
@@ -447,7 +448,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
}
|
||||
String sql = "DELETE " + (this.readPool.isMysql() ? "a" : "") + " FROM " + info.getTable(node) + " a" + (join1 == null ? "" : (", " + join1))
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
|
||||
+ ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
|
||||
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
|
||||
conn.setReadOnly(false);
|
||||
@@ -521,7 +522,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> update(values), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
protected <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||
try {
|
||||
Class clazz = info.getType();
|
||||
int c = -1;
|
||||
@@ -617,7 +618,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
|
||||
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
|
||||
try {
|
||||
int c = -1;
|
||||
if (!info.isVirtualEntity()) {
|
||||
@@ -684,7 +685,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
|
||||
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
|
||||
try {
|
||||
int c = -1;
|
||||
if (!info.isVirtualEntity()) {
|
||||
@@ -703,7 +704,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
|
||||
+ " SET " + info.getSQLColumn("a", column) + " = ?"
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
|
||||
conn.setReadOnly(false);
|
||||
Blob blob = conn.createBlob();
|
||||
@@ -716,7 +717,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1))
|
||||
+ " SET " + info.getSQLColumn("a", column) + " = " + info.formatToString(value)
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
|
||||
conn.setReadOnly(false);
|
||||
final Statement stmt = conn.createStatement();
|
||||
@@ -766,7 +767,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
|
||||
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
try {
|
||||
StringBuilder setsql = new StringBuilder();
|
||||
@@ -882,7 +883,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
try {
|
||||
StringBuilder setsql = new StringBuilder();
|
||||
@@ -921,7 +922,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
}
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
//注:LIMIT 仅支持MySQL 且在多表关联式会异常, 该BUG尚未解决
|
||||
sql += info.createSQLOrderby(flipper) + ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
|
||||
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
|
||||
@@ -992,7 +993,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
|
||||
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
|
||||
if (bean == null || selects == null) return -1;
|
||||
try {
|
||||
final Class<T> clazz = (Class<T>) bean.getClass();
|
||||
@@ -1068,7 +1069,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor());
|
||||
}
|
||||
|
||||
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
|
||||
if (bean == null || node == null || selects == null) return -1;
|
||||
try {
|
||||
final Class<T> clazz = (Class<T>) bean.getClass();
|
||||
@@ -1107,7 +1108,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
}
|
||||
String sql = "UPDATE " + info.getTable(node) + " a " + (join1 == null ? "" : (", " + join1)) + " SET " + setsql
|
||||
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
|
||||
if (debug.get() && info.isLoggable(Level.FINEST)) logger.finest(info.getType().getSimpleName() + " update sql=" + sql);
|
||||
conn.setReadOnly(false);
|
||||
if (blobs != null) {
|
||||
@@ -1877,7 +1878,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> queryColumnSheet(selectedColumn, clazz, flipper, node), getExecutor());
|
||||
}
|
||||
|
||||
private <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
protected <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
Sheet<T> sheet = querySheet(true, needtotal, clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
|
||||
final Sheet<V> rs = new Sheet<>();
|
||||
if (sheet.isEmpty()) return rs;
|
||||
@@ -2083,7 +2084,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return CompletableFuture.supplyAsync(() -> querySheet(clazz, selects, flipper, node), getExecutor());
|
||||
}
|
||||
|
||||
private <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
|
||||
protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
final EntityCache<T> cache = info.getCache();
|
||||
if (readcache && cache != null && cache.isFullLoaded()) {
|
||||
@@ -2134,7 +2135,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
}
|
||||
}
|
||||
|
||||
private static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
|
||||
protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
|
||||
if (str == null) return sb;
|
||||
int pos1 = str.indexOf(ch1, from);
|
||||
if (pos1 < 0) return sb;
|
||||
@@ -2145,7 +2146,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
||||
return multisplit(ch1, ch2, split, sb, str, pos2 + 1);
|
||||
}
|
||||
|
||||
private int[] directExecute(final Connection conn, String... sqls) {
|
||||
protected int[] directExecute(final Connection conn, String... sqls) {
|
||||
if (sqls.length == 0) return new int[0];
|
||||
try {
|
||||
conn.setReadOnly(false);
|
||||
|
||||
@@ -21,6 +21,8 @@ public final class DataSources {
|
||||
|
||||
public static final String JDBC_DATASOURCE_CLASS = "javax.persistence.datasource";
|
||||
|
||||
public static final String JDBC_CACHE_MODE = "javax.persistence.cachemode";
|
||||
|
||||
public static final String JDBC_CONNECTIONSMAX = "javax.persistence.connections.limit";
|
||||
|
||||
public static final String JDBC_CONTAIN_SQLTEMPLATE = "javax.persistence.contain.sqltemplate";
|
||||
@@ -104,7 +106,7 @@ public final class DataSources {
|
||||
}
|
||||
}
|
||||
|
||||
static Map<String, Properties> loadPersistenceXml(final InputStream in0) {
|
||||
public static Map<String, Properties> loadPersistenceXml(final InputStream in0) {
|
||||
final Map<String, Properties> map = new LinkedHashMap();
|
||||
Properties result = new Properties();
|
||||
boolean flag = false;
|
||||
@@ -122,8 +124,8 @@ public final class DataSources {
|
||||
String value = reader.getAttributeValue(null, "value");
|
||||
if (name == null) continue;
|
||||
result.put(name, value);
|
||||
} else if (flag && "shared-cache-mode".equalsIgnoreCase(reader.getLocalName())) {
|
||||
result.put(reader.getLocalName(), reader.getElementText());
|
||||
} else if (flag && "shared-cache-mode".equalsIgnoreCase(reader.getLocalName())) { //兼容shared-cache-mode属性
|
||||
result.put(JDBC_CACHE_MODE, reader.getElementText());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,14 +5,13 @@
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import com.sun.istack.internal.logging.Logger;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.*;
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.*;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.*;
|
||||
import javax.persistence.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
@@ -32,7 +31,7 @@ public final class EntityInfo<T> {
|
||||
private static final ConcurrentHashMap<Class, EntityInfo> entityInfos = new ConcurrentHashMap<>();
|
||||
|
||||
//日志
|
||||
private static final Logger logger = Logger.getLogger(EntityInfo.class);
|
||||
private static final Logger logger = Logger.getLogger(EntityInfo.class.getSimpleName());
|
||||
|
||||
//Entity类名
|
||||
private final Class<T> type;
|
||||
@@ -194,7 +193,7 @@ public final class EntityInfo<T> {
|
||||
try {
|
||||
loader = type.getAnnotation(VirtualEntity.class).loader().newInstance();
|
||||
} catch (Exception e) {
|
||||
logger.severe(type + " init @VirtualEntity.loader error", e);
|
||||
logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e);
|
||||
}
|
||||
this.fullloader = loader;
|
||||
} else {
|
||||
@@ -207,7 +206,7 @@ public final class EntityInfo<T> {
|
||||
try {
|
||||
dts = (dt == null) ? null : dt.strategy().newInstance();
|
||||
} catch (Exception e) {
|
||||
logger.severe(type + " init DistributeTableStrategy error", e);
|
||||
logger.log(Level.SEVERE, type + " init DistributeTableStrategy error", e);
|
||||
}
|
||||
this.tableStrategy = dts;
|
||||
|
||||
@@ -216,7 +215,7 @@ public final class EntityInfo<T> {
|
||||
try {
|
||||
cp = this.creator.getClass().getMethod("create", Object[].class).getAnnotation(Creator.ConstructorParameters.class);
|
||||
} catch (Exception e) {
|
||||
logger.severe(type + " cannot find ConstructorParameters Creator", e);
|
||||
logger.log(Level.SEVERE, type + " cannot find ConstructorParameters Creator", e);
|
||||
}
|
||||
this.constructorParameters = (cp == null || cp.value().length < 1) ? null : cp.value();
|
||||
Attribute idAttr0 = null;
|
||||
|
||||
@@ -85,6 +85,13 @@ public class PoolJdbcSource {
|
||||
if (this.isOracle()) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
|
||||
if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) {
|
||||
this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02");
|
||||
}
|
||||
if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) {
|
||||
//注意:此语句复制表结构会导致默认值和主键信息的丢失
|
||||
this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2");
|
||||
}
|
||||
} else if (this.isSqlserver()) {
|
||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
|
||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");
|
||||
|
||||
@@ -74,6 +74,7 @@ import static jdk.internal.org.objectweb.asm.Opcodes.*;
|
||||
* @param <T> 字段依附的类
|
||||
* @param <F> 字段的数据类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public interface Attribute<T, F> {
|
||||
|
||||
/**
|
||||
|
||||
@@ -82,6 +82,7 @@ public interface Creator<T> {
|
||||
|
||||
String[] value();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static class CreatorInner {
|
||||
|
||||
static class SimpleClassVisitor extends ClassVisitor {
|
||||
@@ -108,7 +109,16 @@ public interface Creator<T> {
|
||||
return new MethodVisitor(Opcodes.ASM5) {
|
||||
@Override
|
||||
public void visitLocalVariable(String name, String description, String signature, Label start, Label end, int index) {
|
||||
if (index > 0) fieldnames.add(name);
|
||||
if (index < 1) return;
|
||||
int size = fieldnames.size();
|
||||
//index不会按顺序执行的
|
||||
if (index > size) {
|
||||
for (int i = size; i < index; i++) {
|
||||
fieldnames.add(" ");
|
||||
}
|
||||
fieldnames.set(index - 1, name);
|
||||
}
|
||||
fieldnames.set(index - 1, name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ public final class Redkale {
|
||||
}
|
||||
|
||||
public static String getDotedVersion() {
|
||||
return "1.8.0";
|
||||
return "1.8.3";
|
||||
}
|
||||
|
||||
public static int getMajorVersion() {
|
||||
|
||||
@@ -9,7 +9,7 @@ import java.lang.ref.WeakReference;
|
||||
import java.lang.reflect.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.*;
|
||||
import java.util.logging.*;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@@ -99,7 +99,7 @@ public final class ResourceFactory {
|
||||
* 1: "$"有特殊含义, 不能表示"$"资源本身
|
||||
* 2: 只能是字母、数字、(短横)-、(下划线)_、点(.)的组合
|
||||
* </pre></blockquote>
|
||||
*
|
||||
*
|
||||
* @param name String
|
||||
*/
|
||||
public void checkName(String name) {
|
||||
@@ -471,11 +471,11 @@ public final class ResourceFactory {
|
||||
}
|
||||
|
||||
public <A> List<A> query(Class<? extends A> clazz) {
|
||||
return query(new ArrayList<A>(), clazz);
|
||||
return query(new ArrayList<>(), clazz);
|
||||
}
|
||||
|
||||
public <A> List<A> query(Type clazz) {
|
||||
return query(new ArrayList<A>(), clazz);
|
||||
return query(new ArrayList<>(), clazz);
|
||||
}
|
||||
|
||||
private <A> List<A> query(final List<A> list, Type clazz) {
|
||||
@@ -489,6 +489,23 @@ public final class ResourceFactory {
|
||||
return list;
|
||||
}
|
||||
|
||||
public <A> List<A> query(final BiPredicate<String, Object> predicate) {
|
||||
return query(new ArrayList<>(), predicate);
|
||||
}
|
||||
|
||||
private <A> List<A> query(final List<A> list, final BiPredicate<String, Object> predicate) {
|
||||
if (predicate == null) return list;
|
||||
for (ConcurrentHashMap<String, ResourceEntry> map : this.store.values()) {
|
||||
for (Map.Entry<String, ResourceEntry> en : map.entrySet()) {
|
||||
if (predicate.test(en.getKey(), en.getValue().value)) {
|
||||
list.add((A) en.getValue().value);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (parent != null) query(list, predicate);
|
||||
return list;
|
||||
}
|
||||
|
||||
private <A> ResourceEntry<A> findEntry(String name, Class<? extends A> clazz) {
|
||||
Map<String, ResourceEntry> map = this.store.get(clazz);
|
||||
if (map != null) {
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
package org.redkale.util;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.*;
|
||||
import java.util.stream.*;
|
||||
|
||||
/**
|
||||
@@ -108,6 +108,15 @@ public class Sheet<T> implements java.io.Serializable, Iterable<T> {
|
||||
}
|
||||
}
|
||||
|
||||
public <R> Sheet<R> map(Function<T, R> mapper) {
|
||||
if (this.isEmpty()) return (Sheet) this;
|
||||
final List<R> list = new ArrayList<>();
|
||||
for (T item : this.rows) {
|
||||
list.add(mapper.apply(item));
|
||||
}
|
||||
return new Sheet<>(getTotal(), list);
|
||||
}
|
||||
|
||||
public void forEachParallel(final Consumer<? super T> consumer) {
|
||||
if (consumer != null && this.rows != null && !this.rows.isEmpty()) {
|
||||
this.rows.parallelStream().forEach(consumer);
|
||||
|
||||
Reference in New Issue
Block a user