54 Commits
1.9.6 ... 1.9.7

Author SHA1 Message Date
Redkale
387865789f 2018-09-20 16:03:33 +08:00
Redkale
03fcf43a89 2018-09-19 11:11:59 +08:00
Redkale
df8090813a 2018-09-12 12:11:19 +08:00
Redkale
849b29d00f CacheSource的增删改查操作增加Type参数 2018-09-12 12:04:59 +08:00
Redkale
4b7f65e1c4 2018-09-11 14:29:42 +08:00
Redkale
4545d81e50 2018-09-11 11:12:43 +08:00
Redkale
1ac5f060a4 增加WebSocketAction功能 2018-09-11 09:13:19 +08:00
Redkale
7e1ff8e315 2018-09-10 17:23:23 +08:00
Redkale
4e0c1fee97 2018-09-10 14:02:26 +08:00
Redkale
0b38f23f2d 2018-09-10 14:01:56 +08:00
Redkale
98ea6861c1 2018-09-10 10:44:03 +08:00
Redkale
c07b628ea1 FilterNode增加readonly属性 2018-09-07 10:21:48 +08:00
Redkale
b1b979c0b5 优化Rest.createRestServlet减少preInit的耗时 2018-09-07 09:20:52 +08:00
Redkale
35b708b01d 2018-09-07 08:50:20 +08:00
Redkale
229ae0d44f 2018-09-06 18:17:11 +08:00
Redkale
7d6897fa36 2018-09-06 14:33:53 +08:00
Redkale
5851093590 2018-09-05 16:58:19 +08:00
Redkale
4646c1d1f0 2018-09-05 16:57:09 +08:00
Redkale
f3763dbf72 2018-09-05 16:41:44 +08:00
Redkale
6a8c60ec78 优化NodeHttpServer.loadRestServlet 2018-09-05 16:15:21 +08:00
Redkale
ae437fd5d6 优化Rest.createRestServlet 2018-09-05 15:53:54 +08:00
Redkale
7251c984c8 修复FilterNode.findValue的bug 2018-09-05 11:00:47 +08:00
Redkale
ec449220eb 2018-09-05 09:49:20 +08:00
Redkale
78265944f0 2018-09-05 09:23:41 +08:00
Redkale
d2791f6d1b 2018-09-05 08:29:58 +08:00
Redkale
d525d2664b 2018-08-31 10:39:43 +08:00
Redkale
a4ccea91ad 修复FilterJoinNode.any方法bug 2018-08-29 16:06:41 +08:00
Redkale
750da161eb 增强HttpUserType的类型校验 2018-08-28 10:18:27 +08:00
Redkale
ac50312f0b 2018-08-27 18:28:56 +08:00
Redkale
8d44d48072 2018-08-27 18:26:16 +08:00
Redkale
6c2baa1708 增加RestConvertCoder功能 2018-08-27 18:25:08 +08:00
Redkale
4525cfe594 2018-08-27 12:36:35 +08:00
Redkale
921f96c975 优化toBuffers方法 2018-08-27 12:29:59 +08:00
Redkale
29ce57d3af BsonWriter.toBuffers存在并发问题 2018-08-27 12:07:45 +08:00
Redkale
2ca1e6305c 修改HttpResponse.finish(byte[]) 2018-08-27 11:55:36 +08:00
Redkale
827b404a57 Update application.xml 2018-08-25 13:35:48 +08:00
Redkale
83569142c1 2018-08-25 13:12:24 +08:00
Redkale
2da0faacc3 2018-08-24 16:24:58 +08:00
Redkale
cf51bee2cc 2018-08-24 16:07:19 +08:00
Redkale
0dd55dc947 2018-08-24 16:04:49 +08:00
Redkale
620fa0430c ClassFilter输出更详细日志 2018-08-23 19:54:47 +08:00
Redkale
d053590257 增加JDK9+环境下显示进程PID功能 2018-08-23 19:28:20 +08:00
Redkale
684af3de61 application.xml的properties值支持${APP_HOME} 2018-08-23 09:59:14 +08:00
Redkale
6c6e26ed0b 2018-08-22 10:37:49 +08:00
Redkale
4a05bfbd08 2018-08-22 10:34:48 +08:00
Redkale
787dc7b32f 2018-08-22 10:03:20 +08:00
Redkale
85a1f99f6e 2018-08-22 09:42:51 +08:00
Redkale
4fe8a1199e 修复CollectionDecoder的creator指定错误的bug 2018-08-22 09:30:11 +08:00
Redkale
7312dbc4c5 增加H2数据库的支持 2018-08-21 10:06:05 +08:00
Redkale
cfecfabc92 createRestServlet兼容throws IOException和RuntimeException的子类 2018-08-20 17:28:34 +08:00
Redkale
587160c5fe Redkale 1.9.7 开始 2018-08-20 17:23:13 +08:00
Redkale
ee7fe3ed33 2018-08-20 16:49:39 +08:00
Redkale
47d4a6cc29 2018-08-20 14:42:16 +08:00
Redkale
c1e4763369 2018-08-20 14:28:02 +08:00
49 changed files with 1191 additions and 275 deletions

View File

@@ -14,7 +14,7 @@ fi
cd "$APP_HOME"
./bin/shutdown.sh
"$APP_HOME"/bin/shutdown.sh
./bin/start.sh
"$APP_HOME"/bin/start.sh

View File

@@ -1,7 +1,7 @@
@ECHO OFF
SET APP_HOME=%~dp0
IF NOT EXIST "%APP_HOME%\conf\application.xml" SET APP_HOME=%~dp0..
java -DAPP_HOME="%APP_HOME%" -classpath "%APP_HOME%"\lib\* org.redkale.boot.Application

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<application port="5050">
<application port="2121">
<!-- 详细配置说明见: http://redkale.org/redkale.html#redkale_confxml -->
@@ -8,7 +8,7 @@
</resources>
<server protocol="HTTP" host="0.0.0.0" port="6060" root="root">
<server protocol="HTTP" port="6060">
<request>
<remoteaddr value="request.headers.X-RemoteAddress"/>

View File

@@ -16,7 +16,6 @@
<shared-cache-mode>ALL</shared-cache-mode>
<properties>
<property name="javax.persistence.jdbc.url" value="jdbc:oracle:thin:@localhost:1521:orcl"/>
<property name="javax.persistence.jdbc.driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="javax.persistence.jdbc.user" value="system"/>
<property name="javax.persistence.jdbc.password" value="1234"/>
</properties>
@@ -26,7 +25,6 @@
<shared-cache-mode>ALL</shared-cache-mode>
<properties>
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://localhost:3306/user?autoReconnect=true&amp;characterEncoding=utf8"/>
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
<property name="javax.persistence.jdbc.user" value="root"/>
<property name="javax.persistence.jdbc.password" value="1234"/>
</properties>

View File

@@ -63,6 +63,7 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<compilerArgument>-parameters</compilerArgument>
<encoding>UTF-8</encoding>
<compilerArguments>
<verbose />

View File

@@ -125,7 +125,7 @@
aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive
readTimeoutSeconds: 读操作超时秒数, 默认0 表示永久不超时
writeTimeoutSeconds: 写操作超时秒数, 默认0 表示永久不超时
netimpl: ProtocolServer的实现类。TCP情况下值可以是aio或nio默认值为aioUDP情况下值可以是bio默认值为bio
netimpl: ProtocolServer的实现类。TCP情况下值可以是aio或nio默认值为aioUDP情况下值可以是bio默认值为bio
interceptor: 启动/关闭NodeServer时被调用的拦截器实现类必须是org.redkale.boot.NodeInterceptor的子类默认为null
-->
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">

View File

@@ -11,8 +11,8 @@ javax.level = INFO
com.sun.level = INFO
#java.util.logging.FileHandler.level = FINE
#10M
java.util.logging.FileHandler.limit = 10485760
java.util.logging.FileHandler.limit = 20M
java.util.logging.FileHandler.count = 100
java.util.logging.FileHandler.encoding = UTF-8
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log

View File

@@ -20,12 +20,15 @@
org.mariadb.jdbc.Driver —————— org.mariadb.jdbc.MySQLDataSource
org.postgresql.Driver —————— org.postgresql.ds.PGConnectionPoolDataSource
com.mysql.jdbc.Driver —————— com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource
com.mysql.cj.jdbc.Driver —————— com.mysql.cj.jdbc.MysqlConnectionPoolDataSource
oracle.jdbc.driver.OracleDriver —————— oracle.jdbc.pool.OracleConnectionPoolDataSource
com.microsoft.sqlserver.jdbc.SQLServerDriver —————— com.microsoft.sqlserver.jdbc.SQLServerConnectionPoolDataSource
org.h2.Driver —————— org.h2.jdbcx.JdbcDataSource
因此 com.mysql.jdbc.Driver 会被自动转换成 com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource
并且如果JDBC驱动是以上几个版本javax.persistence.jdbc.driver属性都可以省略Redkale会根据javax.persistence.jdbc.url的值来识别驱动
-->
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
<property name="javax.persistence.jdbc.source" value="com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource"/>
-->
<property name="javax.persistence.jdbc.user" value="root"/>
<property name="javax.persistence.jdbc.password" value="123456"/>
@@ -47,7 +50,6 @@
<properties>
<!-- jdbc:mysql://127.0.0.1:3306/dbim?autoReconnect=true&amp;autoReconnectForPools=true&amp;characterEncoding=utf8 -->
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbim?characterEncoding=utf8"/>
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
<property name="javax.persistence.jdbc.user" value="root"/>
<property name="javax.persistence.jdbc.password" value="123456"/>
</properties>

View File

@@ -10,7 +10,6 @@ module org.redkale {
requires java.logging;
requires java.xml;
requires java.sql;
requires java.sql.rowset;
requires jdk.unsupported; //sun.misc.Unsafe

View File

@@ -171,7 +171,6 @@ public final class Application {
this.singletonrun = singletonrun;
this.config = config;
System.setProperty("redkale.version", Redkale.getDotedVersion());
System.setProperty("sun.nio.ch.maxCompletionHandlersOnStack", String.valueOf(Math.max(256, Runtime.getRuntime().availableProcessors() * 8)));
final File root = new File(System.getProperty(RESNAME_APP_HOME));
this.resourceFactory.register(RESNAME_APP_TIME, long.class, this.startTime);
@@ -389,7 +388,6 @@ public final class Application {
}
public void init() throws Exception {
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + Runtime.getRuntime().availableProcessors() * 4);
System.setProperty("net.transport.poolmaxconns", "100");
System.setProperty("net.transport.pinginterval", "30");
System.setProperty("net.transport.checkinterval", "30");
@@ -404,7 +402,15 @@ public final class Application {
final String homepath = this.home.getCanonicalPath();
final String confpath = this.conf.getCanonicalPath();
if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath());
logger.log(Level.INFO, "APP_JAVA = " + System.getProperty("java.version") + "\r\n" + RESNAME_APP_ADDR + " = " + this.localAddress.getHostAddress() + "\r\n" + RESNAME_APP_HOME + " = " + homepath + "\r\n" + RESNAME_APP_CONF + " = " + confpath);
String pidstr = "";
try { //JDK 9+
Class phclass = Class.forName("java.lang.ProcessHandle");
Object phobj = phclass.getMethod("current").invoke(null);
Object pid = phclass.getMethod("pid").invoke(phobj);
pidstr = "APP_PID = " + pid + "\r\n";
} catch (Throwable t) {
}
logger.log(Level.INFO, pidstr + "APP_JAVA = " + System.getProperty("java.version") + "\r\n" + RESNAME_APP_ADDR + " = " + this.localAddress.getHostAddress() + "\r\n" + RESNAME_APP_HOME + " = " + homepath + "\r\n" + RESNAME_APP_CONF + " = " + confpath);
String lib = config.getValue("lib", "${APP_HOME}/libs/*").trim().replace("${APP_HOME}", homepath);
lib = lib.isEmpty() ? confpath : (lib + ";" + confpath);
Server.loadLib(classLoader, logger, lib);
@@ -425,7 +431,7 @@ public final class Application {
InputStream in = new FileInputStream(df);
ps.load(in);
in.close();
ps.forEach((x, y) -> resourceFactory.register("property." + x, y));
ps.forEach((x, y) -> resourceFactory.register("property." + x, y.toString().replace("${APP_HOME}", homepath)));
}
}
}
@@ -433,6 +439,7 @@ public final class Application {
String name = prop.getValue("name");
String value = prop.getValue("value");
if (name == null || value == null) continue;
value = value.replace("${APP_HOME}", homepath);
if (name.startsWith("system.property.")) {
System.setProperty(name.substring("system.property.".length()), value);
} else if (name.startsWith("mimetype.property.")) {

View File

@@ -7,7 +7,7 @@ package org.redkale.boot;
import java.io.*;
import java.lang.annotation.*;
import java.lang.reflect.*;
import java.lang.reflect.Modifier;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
@@ -30,7 +30,7 @@ public final class ClassFilter<T> {
private static final Logger logger = Logger.getLogger(ClassFilter.class.getName()); //日志对象
private static final boolean finer = logger.isLoggable(Level.FINER); //日志级别
private static final boolean finest = logger.isLoggable(Level.FINEST); //日志级别
private final Set<FilterEntry<T>> entrys = new HashSet<>(); //符合条件的结果
@@ -136,10 +136,11 @@ public final class ClassFilter<T> {
*
* @param property AnyValue
* @param clazzname String
* @param url URL
*/
@SuppressWarnings("unchecked")
public final void filter(AnyValue property, String clazzname) {
filter(property, clazzname, true);
public final void filter(AnyValue property, String clazzname, URL url) {
filter(property, clazzname, true, url);
}
/**
@@ -150,6 +151,18 @@ public final class ClassFilter<T> {
* @param autoscan 为true表示自动扫描的 false表示显著调用filter AutoLoad的注解将被忽略
*/
public final void filter(AnyValue property, String clazzname, boolean autoscan) {
filter(property, clazzname, autoscan, null);
}
/**
* 过滤指定的class
*
* @param property application.xml中对应class节点下的property属性项
* @param clazzname class名称
* @param autoscan 为true表示自动扫描的 false表示显著调用filter AutoLoad的注解将被忽略
* @param url URL
*/
public final void filter(AnyValue property, String clazzname, boolean autoscan, URL url) {
boolean r = accept0(property, clazzname);
ClassFilter cf = r ? this : null;
if (r && ands != null) {
@@ -165,7 +178,7 @@ public final class ClassFilter<T> {
}
}
}
if (cf == null || clazzname.startsWith("sun.")) return;
if (cf == null || clazzname.startsWith("sun.") || clazzname.contains("module-info")) return;
try {
Class clazz = classLoader.loadClass(clazzname);
if (!cf.accept(property, clazz, autoscan)) return;
@@ -189,9 +202,10 @@ public final class ClassFilter<T> {
entrys.add(new FilterEntry(clazz, autoscan, false, property));
}
} catch (Throwable cfe) {
if (finer && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.")) {
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
if (finest && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.") && !clazzname.startsWith("META-INF")
&& (!(cfe instanceof NoClassDefFoundError) || ((NoClassDefFoundError) cfe).getMessage().startsWith("java.lang.NoClassDefFoundError: java"))) {
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error for class: " + clazzname + (url == null ? "" : (" in " + url)), cfe);
}
}
}
@@ -347,6 +361,7 @@ public final class ClassFilter<T> {
public void setPrivilegeExcludes(Set<String> privilegeExcludes) {
this.privilegeExcludes = privilegeExcludes == null || privilegeExcludes.isEmpty() ? null : privilegeExcludes;
}
/**
@@ -502,7 +517,7 @@ public final class ClassFilter<T> {
classes.add(classname);
if (debug) debugstr.append(classname).append("\r\n");
for (final ClassFilter filter : filters) {
if (filter != null) filter.filter(null, classname);
if (filter != null) filter.filter(null, classname, url);
}
}
}
@@ -511,7 +526,7 @@ public final class ClassFilter<T> {
} else {
for (String classname : classes) {
for (final ClassFilter filter : filters) {
if (filter != null) filter.filter(null, classname);
if (filter != null) filter.filter(null, classname, url);
}
}
}
@@ -530,14 +545,14 @@ public final class ClassFilter<T> {
classes.add(classname);
if (debug) debugstr.append(classname).append("\r\n");
for (final ClassFilter filter : filters) {
if (filter != null) filter.filter(null, classname);
if (filter != null) filter.filter(null, classname, url);
}
}
cache.put(url, classes);
} else {
for (String classname : classes) {
for (final ClassFilter filter : filters) {
if (filter != null) filter.filter(null, classname);
if (filter != null) filter.filter(null, classname, url);
}
}
}

View File

@@ -9,6 +9,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import javax.annotation.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDR;
@@ -217,6 +218,7 @@ public class NodeHttpServer extends NodeServer {
if (!rest) return;
if (restConf == null) return; //不存在REST服务
final long starts = System.currentTimeMillis();
String prefix0 = restConf.getValue("path", "");
if (!prefix0.isEmpty() && prefix0.charAt(prefix0.length() - 1) == '/') prefix0 = prefix0.substring(0, prefix0.length() - 1);
if (!prefix0.isEmpty() && prefix0.charAt(0) != '/') prefix0 = '/' + prefix0;
@@ -243,35 +245,45 @@ public class NodeHttpServer extends NodeServer {
final ClassFilter restFilter = ClassFilter.create(null, restConf.getValue("includes", ""), restConf.getValue("excludes", ""), includeValues, excludeValues);
final boolean finest = logger.isLoggable(Level.FINEST);
super.interceptorServices.forEach((service) -> {
final Class stype = Sncp.getServiceType(service);
final String name = Sncp.getResourceName(service);
RestService rs = (RestService) stype.getAnnotation(RestService.class);
if (rs == null || rs.ignore()) return;
final CountDownLatch scdl = new CountDownLatch(super.interceptorServices.size());
super.interceptorServices.stream().parallel().forEach((service) -> {
try {
final Class stype = Sncp.getServiceType(service);
final String name = Sncp.getResourceName(service);
RestService rs = (RestService) stype.getAnnotation(RestService.class);
if (rs == null || rs.ignore()) return;
final String stypename = stype.getName();
if (!autoload && !includeValues.contains(stypename)) return;
if (!restFilter.accept(stypename)) return;
if (restedObjects.contains(service)) {
logger.log(Level.WARNING, stype.getName() + " repeat create rest servlet, so ignore");
return;
}
restedObjects.add(service); //避免重复创建Rest对象
HttpServlet servlet = httpServer.addRestServlet(serverClassLoader, service, userType, baseServletType, prefix);
if (servlet == null) return; //没有HttpMapping方法的HttpServlet调用Rest.createRestServlet就会返回null
String prefix2 = prefix;
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this);
if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
for (int i = 0; i < mappings.length; i++) {
mappings[i] = prefix2 + mappings[i];
final String stypename = stype.getName();
if (!autoload && !includeValues.contains(stypename)) return;
if (!restFilter.accept(stypename)) return;
synchronized (restedObjects) {
if (restedObjects.contains(service)) {
logger.log(Level.WARNING, stype.getName() + " repeat create rest servlet, so ignore");
return;
}
restedObjects.add(service); //避免重复创建Rest对象
}
ss.add(new AbstractMap.SimpleEntry<>(servlet.getClass().getName(), mappings));
HttpServlet servlet = httpServer.addRestServlet(serverClassLoader, service, userType, baseServletType, prefix);
if (servlet == null) return; //没有HttpMapping方法的HttpServlet调用Rest.createRestServlet就会返回null
String prefix2 = prefix;
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this);
//if (finest) logger.finest(threadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
if (ss != null) {
String[] mappings = servlet.getClass().getAnnotation(WebServlet.class).value();
for (int i = 0; i < mappings.length; i++) {
mappings[i] = prefix2 + mappings[i];
}
synchronized (ss) {
ss.add(new AbstractMap.SimpleEntry<>(servlet.getClass().getName() + "(rest.name='" + name + "')", mappings));
}
}
} finally {
scdl.countDown();
}
});
scdl.await();
}
if (webSocketFilter != null) { //加载RestWebSocket
final Set<String> includeValues = new HashSet<>();
@@ -340,6 +352,7 @@ public class NodeHttpServer extends NodeServer {
}
sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR);
}
sb.append(threadName).append(" All HttpServlets load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR);
}
}
}

View File

@@ -370,6 +370,7 @@ public abstract class NodeServer {
@SuppressWarnings("unchecked")
protected void loadService(ClassFilter<? extends Service> serviceFilter, ClassFilter otherFilter) throws Exception {
if (serviceFilter == null) return;
final long starts = System.currentTimeMillis();
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
@@ -401,6 +402,11 @@ public abstract class NodeServer {
if (localed && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类
final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) { //class没有可用的方法且没有标记启动优先级的 通常为BaseService
logger.log(Level.FINE, serviceImplClass + " cannot load because not found less one public non-final method");
return;
}
Service service;
boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
@@ -408,8 +414,6 @@ public abstract class NodeServer {
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if (SncpClient.parseMethod(serviceImplClass).isEmpty() && serviceImplClass.getAnnotation(Priority.class) == null) return; //class没有可用的方法且没有标记启动优先级的 通常为BaseService
final Class restype = Sncp.getResourceType(service);
if (rf.find(resourceName, restype) == null) {
regFactory.register(resourceName, restype, service);
@@ -473,24 +477,19 @@ public abstract class NodeServer {
localServices.addAll(swlist);
//this.loadPersistData();
final List<String> slist = sb == null ? null : new CopyOnWriteArrayList<>();
CountDownLatch clds = new CountDownLatch(localServices.size());
localServices.stream().forEach(y -> {
try {
long s = System.currentTimeMillis();
y.init(Sncp.getConf(y));
long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxClassNameLength);
if (slist != null) slist.add(new StringBuilder().append(threadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString());
} finally {
clds.countDown();
}
long s = System.currentTimeMillis();
y.init(Sncp.getConf(y));
long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxClassNameLength);
if (slist != null) slist.add(new StringBuilder().append(threadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString());
});
clds.await();
if (slist != null && sb != null) {
List<String> wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行)
for (String s : wlist) {
sb.append(s);
}
sb.append(threadName).append(" All Services load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR);
}
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}

View File

@@ -46,7 +46,7 @@ public class CollectionDecoder<T> implements Decodeable<Reader, Collection<T>> {
this.decoder = factory.loadDecoder(this.componentType);
} else if (factory.isReversible()) {
this.componentType = Object.class;
this.creator = factory.loadCreator(Object.class);
this.creator = factory.loadCreator(type instanceof Class ? (Class) type : Collection.class);
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
} else {
@@ -111,7 +111,7 @@ public class CollectionDecoder<T> implements Decodeable<Reader, Collection<T>> {
protected Reader getItemReader(Reader in, DeMember member, boolean first) {
return in;
}
protected T readMemberValue(Reader in, DeMember member, boolean first) {
return this.decoder.convertFrom(in);
}

View File

@@ -45,6 +45,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
private final ConcurrentHashMap<String, Class> entitys = new ConcurrentHashMap();
private final ConcurrentHashMap<Type, Map<String, SimpledCoder<R, W, ?>>> fieldCoders = new ConcurrentHashMap();
private final ConcurrentHashMap<Type, Decodeable<R, ?>> decoders = new ConcurrentHashMap();
private final ConcurrentHashMap<Type, Encodeable<W, ?>> encoders = new ConcurrentHashMap();
@@ -495,6 +497,29 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoders.put(clazz, encoder);
}
//coder = null表示删除该字段的指定SimpledCoder
public final <E> void register(final Class clazz, final String field, final SimpledCoder<R, W, E> coder) {
if (field == null || clazz == null) return;
try {
clazz.getDeclaredField(field);
} catch (Exception e) {
throw new RuntimeException(clazz + " not found field(" + field + ")");
}
if (coder == null) {
Map map = this.fieldCoders.get(clazz);
if (map != null) map.remove(field);
} else {
this.fieldCoders.computeIfAbsent(clazz, c -> new ConcurrentHashMap<>()).put(field, coder);
}
}
public final <E> SimpledCoder<R, W, E> findFieldCoder(final Type clazz, final String field) {
if (field == null) return null;
Map<String, SimpledCoder<R, W, ?>> map = this.fieldCoders.get(clazz);
if (map == null) return parent == null ? null : parent.findFieldCoder(clazz, field);
return (SimpledCoder) map.get(field);
}
public final <E> Decodeable<R, E> findDecoder(final Type type) {
Decodeable<R, E> rs = (Decodeable<R, E>) decoders.get(type);
if (rs != null) return rs;

View File

@@ -93,8 +93,12 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
if (factory.isConvertDisabled(field)) continue;
ref = factory.findRef(field);
if (ref != null && ref.ignore()) continue;
Type t = TypeToken.createClassType(TypeToken.getGenericType(field.getGenericType(), this.type), this.type);
DeMember member = new DeMember(ObjectEncoder.createAttribute(factory, clazz, field, null, null), factory.loadDecoder(t));
Decodeable<R, ?> fieldCoder = factory.findFieldCoder(clazz, field.getName());
if (fieldCoder == null) {
Type t = TypeToken.createClassType(TypeToken.getGenericType(field.getGenericType(), this.type), this.type);
fieldCoder = factory.loadDecoder(t);
}
DeMember member = new DeMember(ObjectEncoder.createAttribute(factory, clazz, field, null, null), fieldCoder);
if (ref != null) member.index = ref.getIndex();
list.add(member);
}
@@ -118,8 +122,13 @@ public class ObjectDecoder<R extends Reader, T> implements Decodeable<R, T> {
}
ref = factory.findRef(method);
if (ref != null && ref.ignore()) continue;
Type t = TypeToken.createClassType(TypeToken.getGenericType(method.getGenericParameterTypes()[0], this.type), this.type);
DeMember member = new DeMember(ObjectEncoder.createAttribute(factory, clazz, null, null, method), factory.loadDecoder(t));
Decodeable<R, ?> fieldCoder = factory.findFieldCoder(clazz, ConvertFactory.readGetSetFieldName(method));
if (fieldCoder == null) {
Type t = TypeToken.createClassType(TypeToken.getGenericType(method.getGenericParameterTypes()[0], this.type), this.type);
fieldCoder = factory.loadDecoder(t);
}
DeMember member = new DeMember(ObjectEncoder.createAttribute(factory, clazz, null, null, method), fieldCoder);
if (ref != null) member.index = ref.getIndex();
list.add(member);
}

View File

@@ -77,8 +77,12 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
if (factory.isConvertDisabled(field)) continue;
ref = factory.findRef(field);
if (ref != null && ref.ignore()) continue;
Type t = TypeToken.createClassType(TypeToken.getGenericType(field.getGenericType(), this.type), this.type);
EnMember member = new EnMember(createAttribute(factory, clazz, field, null, null), factory.loadEncoder(t));
Encodeable<W, ?> fieldCoder = factory.findFieldCoder(clazz, field.getName());
if (fieldCoder == null) {
Type t = TypeToken.createClassType(TypeToken.getGenericType(field.getGenericType(), this.type), this.type);
fieldCoder = factory.loadEncoder(t);
}
EnMember member = new EnMember(createAttribute(factory, clazz, field, null, null), fieldCoder);
if (ref != null) member.index = ref.getIndex();
list.add(member);
}
@@ -102,8 +106,12 @@ public class ObjectEncoder<W extends Writer, T> implements Encodeable<W, T> {
}
ref = factory.findRef(method);
if (ref != null && ref.ignore()) continue;
Type t = TypeToken.createClassType(TypeToken.getGenericType(method.getGenericReturnType(), this.type), this.type);
EnMember member = new EnMember(createAttribute(factory, clazz, null, method, null), factory.loadEncoder(t));
Encodeable<W, ?> fieldCoder = factory.findFieldCoder(clazz, ConvertFactory.readGetSetFieldName(method));
if (fieldCoder == null) {
Type t = TypeToken.createClassType(TypeToken.getGenericType(method.getGenericReturnType(), this.type), this.type);
fieldCoder = factory.loadEncoder(t);
}
EnMember member = new EnMember(createAttribute(factory, clazz, null, method, null), fieldCoder);
if (ref != null) member.index = ref.getIndex();
list.add(member);
}

View File

@@ -126,7 +126,8 @@ public class HttpPrepareServlet extends PrepareServlet<String, HttpContext, Http
List<HttpServlet> list = removeHttpServlet(predicateEntry, predicateFilter);
return list == null || list.isEmpty() ? null : list.get(0);
}
@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked")
public <T extends WebSocket> HttpServlet removeHttpServlet(Class<T> websocketOrServletType) {
Predicate<MappingEntry> predicateEntry = (t) -> {
Class type = t.servlet.getClass();

View File

@@ -614,15 +614,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
@Override
public void finish(final byte[] bs) {
if (isClosed()) return; //避免重复关闭
if (this.context.getBufferCapacity() >= bs.length) {
ByteBuffer buffer = getBodyBufferSupplier().get();
buffer.put(bs);
buffer.flip();
this.finish(false, buffer);
} else {
this.finish(false, ByteBuffer.wrap(bs));
}
this.finish(this.contentType, bs);
}
/**
@@ -633,15 +625,30 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
public void finish(final String contentType, final byte[] bs) {
if (isClosed()) return; //避免重复关闭
this.contentType = contentType;
if (this.context.getBufferCapacity() >= bs.length) {
ByteBuffer buffer = getBodyBufferSupplier().get();
buffer.put(bs);
buffer.flip();
this.finish(false, buffer);
final byte[] content = bs == null ? new byte[0] : bs;
if (!this.headsended) {
this.contentType = contentType;
this.contentLength = content.length;
ByteBuffer headbuf = createHeader();
if (headbuf.remaining() >= content.length) {
headbuf.put(content);
headbuf.flip();
super.finish(false, headbuf);
} else {
headbuf.flip();
super.finish(false, new ByteBuffer[]{headbuf, ByteBuffer.wrap(content)});
}
} else {
this.finish(false, ByteBuffer.wrap(bs));
if (this.context.getBufferCapacity() >= content.length) {
ByteBuffer buffer = getBodyBufferSupplier().get();
buffer.put(content);
buffer.flip();
this.finish(false, buffer);
} else {
this.finish(false, ByteBuffer.wrap(content));
}
}
}
/**

View File

@@ -256,17 +256,19 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final boolean sncp = Sncp.isSncpDyn(service);
final String resname = name == null ? (sncp ? Sncp.getResourceName(service) : "") : name;
final Class<S> serviceType = Sncp.getServiceType(service);
for (final HttpServlet item : ((HttpPrepareServlet) this.prepare).getServlets()) {
if (!(item instanceof HttpServlet)) continue;
if (item.getClass().getAnnotation(Rest.RestDyn.class) == null) continue;
try {
Field field = item.getClass().getDeclaredField(Rest.REST_SERVICE_FIELD_NAME);
if (serviceType.equals(field.getType())) {
servlet = (T) item;
break;
if (name != null) {
for (final HttpServlet item : ((HttpPrepareServlet) this.prepare).getServlets()) {
if (!(item instanceof HttpServlet)) continue;
if (item.getClass().getAnnotation(Rest.RestDyn.class) == null) continue;
try {
Field field = item.getClass().getDeclaredField(Rest.REST_SERVICE_FIELD_NAME);
if (serviceType.equals(field.getType())) {
servlet = (T) item;
break;
}
} catch (NoSuchFieldException | SecurityException e) {
logger.log(Level.SEVERE, "serviceType = " + serviceType + ", servletClass = " + item.getClass(), e);
}
} catch (NoSuchFieldException | SecurityException e) {
logger.log(Level.SEVERE, "serviceType = " + serviceType + ", servletClass = " + item.getClass(), e);
}
}
final boolean first = servlet == null;

View File

@@ -34,13 +34,15 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
String _prefix = ""; //当前HttpServlet的path前缀
private Map.Entry<String, Entry>[] mappings;
HashMap<String, InnerActionEntry> _tmpentrys; //Rest生成时赋值, 字段名Rest有用到
private Map.Entry<String, InnerActionEntry>[] mappings; //字段名Rest有用到
//这里不能直接使用HttpServlet会造成死循环初始化HttpServlet
private final Servlet<HttpContext, HttpRequest, HttpResponse> authSuccessServlet = new Servlet<HttpContext, HttpRequest, HttpResponse>() {
@Override
public void execute(HttpRequest request, HttpResponse response) throws IOException {
Entry entry = (Entry) request.attachment;
InnerActionEntry entry = (InnerActionEntry) request.attachment;
if (entry.cacheseconds > 0) {//有缓存设置
CacheEntry ce = entry.cache.get(request.getRequestURI());
if (ce != null && ce.time + entry.cacheseconds > System.currentTimeMillis()) { //缓存有效
@@ -59,9 +61,9 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
private final Servlet<HttpContext, HttpRequest, HttpResponse> preSuccessServlet = new Servlet<HttpContext, HttpRequest, HttpResponse>() {
@Override
public void execute(HttpRequest request, HttpResponse response) throws IOException {
for (Map.Entry<String, Entry> en : mappings) {
for (Map.Entry<String, InnerActionEntry> en : mappings) {
if (request.getRequestURI().startsWith(en.getKey())) {
Entry entry = en.getValue();
InnerActionEntry entry = en.getValue();
if (!entry.checkMethod(request.getMethod())) {
response.finishJson(new RetResult(RET_METHOD_ERROR, "Method(" + request.getMethod() + ") Error"));
return;
@@ -69,11 +71,11 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
request.attachment = entry;
request.moduleid = entry.moduleid;
request.actionid = entry.actionid;
if (entry.ignore) {
authSuccessServlet.execute(request, response);
} else {
if (entry.auth) {
response.thenEvent(authSuccessServlet);
authenticate(request, response);
} else {
authSuccessServlet.execute(request, response);
}
return;
}
@@ -84,13 +86,14 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
@SuppressWarnings("unchecked")
void preInit(HttpContext context, AnyValue config) {
if (this.mappings != null) return; //无需重复preInit
String path = _prefix == null ? "" : _prefix;
WebServlet ws = this.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) path = "";
HashMap<String, Entry> map = load();
HashMap<String, InnerActionEntry> map = this._tmpentrys != null ? this._tmpentrys : loadActionEntry();
this.mappings = new Map.Entry[map.size()];
int i = -1;
for (Map.Entry<String, Entry> en : map.entrySet()) {
for (Map.Entry<String, InnerActionEntry> en : map.entrySet()) {
mappings[++i] = new AbstractMap.SimpleEntry<>(path + en.getKey(), en.getValue());
}
//必须要倒排序, /query /query1 /query12 确保含子集的优先匹配 /query12 /query1 /query
@@ -163,10 +166,10 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
preExecute(request, response);
}
private HashMap<String, Entry> load() {
private HashMap<String, InnerActionEntry> loadActionEntry() {
WebServlet module = this.getClass().getAnnotation(WebServlet.class);
final int serviceid = module == null ? 0 : module.moduleid();
final HashMap<String, Entry> map = new HashMap<>();
final HashMap<String, InnerActionEntry> map = new HashMap<>();
HashMap<String, Class> nameset = new HashMap<>();
final Class selfClz = this.getClass();
Class clz = this.getClass();
@@ -197,13 +200,82 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
throw new RuntimeException(this.getClass().getSimpleName() + " have two same " + HttpMapping.class.getSimpleName() + "(" + name + ")");
}
nameset.put(name, clz);
map.put(name, new Entry(serviceid, actionid, name, methods, method, createHttpServlet(method)));
map.put(name, new InnerActionEntry(serviceid, actionid, name, methods, method, createActionServlet(method)));
}
} while ((clz = clz.getSuperclass()) != HttpServlet.class);
return map;
}
private HttpServlet createHttpServlet(final Method method) {
protected static final class InnerActionEntry {
InnerActionEntry(int moduleid, int actionid, String name, String[] methods, Method method, HttpServlet servlet) {
this(moduleid, actionid, name, methods, method, auth(method), cacheseconds(method), servlet);
}
//供Rest类使用参数不能随便更改
public InnerActionEntry(int moduleid, int actionid, String name, String[] methods, Method method, boolean auth, int cacheseconds, HttpServlet servlet) {
this.moduleid = moduleid;
this.actionid = actionid;
this.name = name;
this.methods = methods;
this.method = method; //rest构建会为null
this.servlet = servlet;
this.auth = auth;
this.cacheseconds = cacheseconds;
this.cache = cacheseconds > 0 ? new ConcurrentHashMap<>() : null;
this.cacheHandler = cacheseconds > 0 ? (HttpResponse response, ByteBuffer[] buffers) -> {
int status = response.getStatus();
if (status != 200) return null;
CacheEntry ce = new CacheEntry(response.getStatus(), response.getContentType(), buffers);
cache.put(response.getRequest().getRequestURI(), ce);
return ce.getBuffers();
} : null;
}
private static boolean auth(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null || mapping.auth();
}
private static int cacheseconds(Method method) {
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
return mapping == null ? 0 : mapping.cacheseconds();
}
boolean isNeedCheck() {
return this.moduleid != 0 || this.actionid != 0;
}
boolean checkMethod(final String reqMethod) {
if (methods.length == 0) return true;
for (String m : methods) {
if (reqMethod.equalsIgnoreCase(m)) return true;
}
return false;
}
final BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> cacheHandler;
final ConcurrentHashMap<String, CacheEntry> cache;
final int cacheseconds;
final boolean auth;
final int moduleid;
final int actionid;
final String name;
final String[] methods;
final Method method;
final HttpServlet servlet;
}
private HttpServlet createActionServlet(final Method method) {
//------------------------------------------------------------------------------
final String supDynName = HttpServlet.class.getName().replace('.', '/');
final String interName = this.getClass().getName().replace('.', '/');
@@ -282,61 +354,6 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
}
}
private static final class Entry {
public Entry(int moduleid, int actionid, String name, String[] methods, Method method, HttpServlet servlet) {
this.moduleid = moduleid;
this.actionid = actionid;
this.name = name;
this.methods = methods;
this.method = method;
this.servlet = servlet;
HttpMapping mapping = method.getAnnotation(HttpMapping.class);
this.ignore = mapping == null || !mapping.auth();
this.cacheseconds = mapping == null ? 0 : mapping.cacheseconds();
this.cache = cacheseconds > 0 ? new ConcurrentHashMap<>() : null;
this.cacheHandler = cacheseconds > 0 ? (HttpResponse response, ByteBuffer[] buffers) -> {
int status = response.getStatus();
if (status != 200) return null;
CacheEntry ce = new CacheEntry(response.getStatus(), response.getContentType(), buffers);
cache.put(response.getRequest().getRequestURI(), ce);
return ce.getBuffers();
} : null;
}
public boolean isNeedCheck() {
return this.moduleid != 0 || this.actionid != 0;
}
public boolean checkMethod(final String reqMethod) {
if (methods.length == 0) return true;
for (String m : methods) {
if (reqMethod.equalsIgnoreCase(m)) return true;
}
return false;
}
public final BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> cacheHandler;
public final ConcurrentHashMap<String, CacheEntry> cache;
public final int cacheseconds;
public final boolean ignore;
public final int moduleid;
public final int actionid;
public final String name;
public final String[] methods;
public final Method method;
public final HttpServlet servlet;
}
private static final class CacheEntry {
public final long time = System.currentTimeMillis();

View File

@@ -12,7 +12,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 配合 HttpServlet 使用。
* 用于指定HttpRequest.currentUser的数据类型。<br>
* 注意: 数据类型是JavaBean则必须要用javax.persistence.Id标记主键字段用于确定用户ID
* 注意: 数据类型是JavaBean
*
* <p>
* 详情见: https://redkale.org

View File

@@ -39,6 +39,8 @@ public final class Rest {
static final String REST_SERVICE_FIELD_NAME = "_redkale_service";
static final String REST_TOSTRINGOBJ_FIELD_NAME = "_redkale_tostringsupplier";
static final String REST_JSONCONVERT_FIELD_PREFIX = "_redkale_jsonconvert_";
static final String REST_SERVICEMAP_FIELD_NAME = "_redkale_servicemap"; //如果只有name=""的Service资源则实例中_servicemap必须为null
@@ -127,25 +129,37 @@ public final class Rest {
}
}
static JsonConvert createJsonConvert(RestConvert[] converts) {
if (converts == null || converts.length < 1) return JsonConvert.root();
static JsonConvert createJsonConvert(RestConvert[] converts, RestConvertCoder[] coders) {
if ((converts == null || converts.length < 1) && (coders == null || coders.length < 1)) return JsonConvert.root();
final JsonFactory childFactory = JsonFactory.create();
List<Class> types = new ArrayList<>();
for (RestConvert rc : converts) {
if (rc.type() == void.class || rc.type() == Void.class) {
return JsonFactory.create().skipAllIgnore(true).getConvert();
Set<Class> reloadTypes = new HashSet<>();
if (coders != null) {
for (RestConvertCoder rcc : coders) {
reloadTypes.add(rcc.type());
childFactory.register(rcc.type(), rcc.field(), Creator.create(rcc.coder()).create());
}
if (types.contains(rc.type())) throw new RuntimeException("@RestConvert type(" + rc.type() + ") repeat");
if (rc.skipIgnore()) {
childFactory.registerSkipIgnore(rc.type());
childFactory.reloadCoder(rc.type());
} else {
childFactory.register(rc.type(), false, rc.convertColumns());
childFactory.register(rc.type(), true, rc.ignoreColumns());
childFactory.reloadCoder(rc.type());
}
if (converts != null) {
for (RestConvert rc : converts) {
if (rc.type() == void.class || rc.type() == Void.class) {
return JsonFactory.create().skipAllIgnore(true).getConvert();
}
if (types.contains(rc.type())) throw new RuntimeException("@RestConvert type(" + rc.type() + ") repeat");
if (rc.skipIgnore()) {
childFactory.registerSkipIgnore(rc.type());
childFactory.reloadCoder(rc.type());
} else {
childFactory.register(rc.type(), false, rc.convertColumns());
childFactory.register(rc.type(), true, rc.ignoreColumns());
childFactory.reloadCoder(rc.type());
}
types.add(rc.type());
childFactory.tiny(rc.tiny());
}
types.add(rc.type());
childFactory.tiny(rc.tiny());
}
for (Class type : reloadTypes) {
childFactory.reloadCoder(type);
}
return childFactory.getConvert();
}
@@ -239,7 +253,19 @@ public final class Rest {
final String resourceGenericDescriptor = sb1.length() == sb2.length() ? null : sb2.toString();
//----------------------------------------------------------------------------------------
final Map<String, List<String>> asmParamMap = MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), webSocketType);
boolean namePresent = false;
try {
Method m0 = null;
for (Method method : webSocketType.getMethods()) {
if (method.getParameterCount() > 0) {
m0 = method;
break;
}
}
namePresent = m0 == null ? true : m0.getParameters()[0].isNamePresent();
} catch (Exception e) {
}
final Map<String, List<String>> asmParamMap = namePresent ? null : MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), webSocketType);
final Set<String> messageNames = new HashSet<>();
final List<Method> messageMethods = new ArrayList<>();
for (Method method : webSocketType.getMethods()) {
@@ -676,7 +702,8 @@ public final class Rest {
final String retDesc = Type.getDescriptor(RetResult.class);
final String futureDesc = Type.getDescriptor(CompletableFuture.class);
final String flipperDesc = Type.getDescriptor(Flipper.class);
final String httprsDesc = Type.getDescriptor(HttpResult.class);
final String httpServletName = HttpServlet.class.getName().replace('.', '/');
final String innerEntryName = HttpServlet.InnerActionEntry.class.getName().replace('.', '/');
final String attrDesc = Type.getDescriptor(org.redkale.util.Attribute.class);
final String multiContextDesc = Type.getDescriptor(MultiContext.class);
final String multiContextName = MultiContext.class.getName().replace('.', '/');
@@ -693,6 +720,9 @@ public final class Rest {
HttpUserType hut = baseServletType.getAnnotation(HttpUserType.class);
final Class userType = (userType0 == null || userType0 == Object.class) ? (hut == null ? null : hut.value()) : userType0;
if (userType != null && (userType.isPrimitive() || userType.getName().startsWith("java.") || userType.getName().startsWith("javax."))) {
throw new RuntimeException(HttpUserType.class.getSimpleName() + " must be a JavaBean but found " + userType);
}
final String supDynName = baseServletType.getName().replace('.', '/');
final RestService controller = serviceType.getAnnotation(RestService.class);
@@ -750,13 +780,12 @@ public final class Rest {
if (ignore) continue;
Class[] extypes = method.getExceptionTypes();
if (extypes.length > 1) {
if (mappings != null && mappings.length > 0) throw new RuntimeException("@" + RestMapping.class.getSimpleName() + " only for method with throws IOException");
continue;
}
if (extypes.length == 1 && extypes[0] != IOException.class) {
if (mappings != null && mappings.length > 0) throw new RuntimeException("@" + RestMapping.class.getSimpleName() + " only for method with throws IOException");
continue;
if (extypes.length > 0) {
for (Class exp : extypes) {
if (!RuntimeException.class.isAssignableFrom(exp) && !IOException.class.isAssignableFrom(exp)) {
throw new RuntimeException("@" + RestMapping.class.getSimpleName() + " only for method(" + method + ") with throws IOException");
}
}
}
paramtypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
if (mappings.length == 0) { //没有Mapping设置一个默认值
@@ -774,9 +803,10 @@ public final class Rest {
}
if (entrys.isEmpty()) return null; //没有可HttpMapping的方法
RestClassLoader newLoader = new RestClassLoader(loader);
final int moduleid = controller == null ? 0 : controller.moduleid();
{ //注入 @WebServlet 注解
String urlpath = "";
int moduleid = controller == null ? 0 : controller.moduleid();
boolean repair = controller == null ? true : controller.repair();
String comment = controller == null ? "" : controller.comment();
av0 = cw.visitAnnotation(webServletDesc, true);
@@ -812,7 +842,13 @@ public final class Rest {
classMap.put("repair", repair);
//classMap.put("comment", comment); //不显示太多信息
}
{ //内部类
cw.visitInnerClass(innerEntryName, httpServletName, HttpServlet.InnerActionEntry.class.getSimpleName(), ACC_PROTECTED + ACC_FINAL + ACC_STATIC);
for (final MappingEntry entry : entrys) {
cw.visitInnerClass(newDynName + "$" + entry.newActionClassName, newDynName, entry.newActionClassName, ACC_PRIVATE + ACC_STATIC);
}
}
{ //注入 @Resource private XXXService _service;
fv = cw.visitField(ACC_PRIVATE, REST_SERVICE_FIELD_NAME, serviceDesc, null, null);
av0 = fv.visitAnnotation(resDesc, true);
@@ -835,6 +871,10 @@ public final class Rest {
fv = cw.visitField(ACC_PRIVATE, REST_PARAMTYPES_FIELD_NAME, "[[Ljava/lang/reflect/Type;", null, null);
fv.visitEnd();
}
{ //_redkale_tostringsupplier字段 Supplier<String>
fv = cw.visitField(ACC_PRIVATE, REST_TOSTRINGOBJ_FIELD_NAME, "Ljava/util/function/Supplier;", "Ljava/util/function/Supplier<Ljava/lang/String;>;", null);
fv.visitEnd();
}
{ //构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -846,10 +886,22 @@ public final class Rest {
}
//将每个Service可转换的方法生成HttpServlet对应的HttpMapping方法
final Map<String, List<String>> asmParamMap = MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), serviceType);
boolean namePresent = false;
try {
Method m0 = null;
for (final MappingEntry entry : entrys) {
if (entry.mappingMethod.getParameterCount() > 0) {
m0 = entry.mappingMethod;
break;
}
}
namePresent = m0 == null ? true : m0.getParameters()[0].isNamePresent();
} catch (Exception e) {
}
final Map<String, List<String>> asmParamMap = namePresent ? null : MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), serviceType);
final Map<String, java.lang.reflect.Type> bodyTypes = new HashMap<>();
final List<RestConvert[]> restConverts = new ArrayList<>();
final List<Object[]> restConverts = new ArrayList<>();
for (final MappingEntry entry : entrys) {
RestUploadFile mupload = null;
Class muploadType = null;
@@ -859,9 +911,12 @@ public final class Rest {
final Parameter[] params = method.getParameters();
final RestConvert[] rcs = method.getAnnotationsByType(RestConvert.class);
if (rcs != null && rcs.length > 0) restConverts.add(rcs);
final RestConvertCoder[] rcc = method.getAnnotationsByType(RestConvertCoder.class);
if ((rcs != null && rcs.length > 0) || (rcc != null && rcc.length > 0)) {
restConverts.add(new Object[]{rcs, rcc});
}
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, entry.name.replace('/', '$').replace('.', '_'), "(" + reqDesc + respDesc + ")V", null, new String[]{"java/io/IOException"}));
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, entry.newMethodName, "(" + reqDesc + respDesc + ")V", null, new String[]{"java/io/IOException"}));
//mv.setDebug(true);
mv.debugLine();
@@ -1042,6 +1097,7 @@ public final class Rest {
mappingMap.put("comment", entry.comment);
mappingMap.put("methods", entry.methods);
mappingMap.put("result", grt == returnType ? returnType.getName() : String.valueOf(grt));
entry.mappingurl = url;
}
{ // 设置 Annotation
@@ -1695,8 +1751,99 @@ public final class Rest {
mv.visitMaxs(maxStack, maxLocals);
mappingMap.put("params", paramMaps);
mappingMaps.add(mappingMap);
{ //_Dync_XXX__HttpServlet.class
ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES);
cw2.visit(V1_8, ACC_SUPER, newDynName + "$" + entry.newActionClassName, null, httpServletName, null);
cw2.visitInnerClass(newDynName + "$" + entry.newActionClassName, newDynName, entry.newActionClassName, ACC_PRIVATE + ACC_STATIC);
{
fv = cw2.visitField(0, "servlet", "L" + newDynName + ";", null, null);
fv.visitEnd();
}
{
mv = new MethodDebugVisitor(cw2.visitMethod(0, "<init>", "(L" + newDynName + ";)V", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKESPECIAL, httpServletName, "<init>", "()V", false);
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";");
mv.visitInsn(RETURN);
mv.visitMaxs(2, 2);
mv.visitEnd();
}
if (false) {
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_SYNTHETIC, "<init>", "(L" + newDynName + ";L" + newDynName + "$" + entry.newActionClassName + ";)V", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKESPECIAL, newDynName + "$" + entry.newActionClassName, "<init>", "L" + newDynName + ";", false);
mv.visitInsn(RETURN);
mv.visitMaxs(2, 3);
mv.visitEnd();
}
{
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(" + reqDesc + respDesc + ")V", null, new String[]{"java/io/IOException"}));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";");
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, entry.newMethodName, "(" + reqDesc + respDesc + ")V", false);
mv.visitInsn(RETURN);
mv.visitMaxs(3, 3);
mv.visitEnd();
}
cw2.visitEnd();
newLoader.addClass((newDynName + "$" + entry.newActionClassName).replace('/', '.'), cw2.toByteArray());
}
} // end for each
// HashMap<String, InnerActionEntry> _createRestInnerActionEntry() {
// HashMap<String, InnerActionEntry> map = new HashMap<>();
// map.put("asyncfind3", new InnerActionEntry(100000,200000,"asyncfind3", new String[]{},null,false,0, new _Dync_asyncfind3_HttpServlet()));
// map.put("asyncfind3", new InnerActionEntry(1,2,"asyncfind2", new String[]{"GET", "POST"},null,true,0, new _Dync_asyncfind2_HttpServlet()));
// return map;
// }
{ //_createRestInnerActionEntry 方法
mv = new MethodDebugVisitor(cw.visitMethod(0, "_createRestInnerActionEntry", "()Ljava/util/HashMap;", "()Ljava/util/HashMap<Ljava/lang/String;L" + innerEntryName + ";>;", null));
//mv.setDebug(true);
mv.visitTypeInsn(NEW, "java/util/HashMap");
mv.visitInsn(DUP);
mv.visitMethodInsn(INVOKESPECIAL, "java/util/HashMap", "<init>", "()V", false);
mv.visitVarInsn(ASTORE, 1);
for (final MappingEntry entry : entrys) {
mv.visitVarInsn(ALOAD, 1);
mv.visitLdcInsn(entry.mappingurl); //name
mv.visitTypeInsn(NEW, innerEntryName); //new InnerActionEntry
mv.visitInsn(DUP);
pushInt(mv, moduleid); //moduleid
pushInt(mv, entry.actionid); //actionid
mv.visitLdcInsn(entry.mappingurl); //name
pushInt(mv, entry.methods.length); //methods
mv.visitTypeInsn(ANEWARRAY, "java/lang/String");
for (int i = 0; i < entry.methods.length; i++) {
mv.visitInsn(DUP);
pushInt(mv, i);
mv.visitLdcInsn(entry.methods[i]);
mv.visitInsn(AASTORE);
}
mv.visitInsn(ACONST_NULL); //method
mv.visitInsn(entry.auth ? ICONST_1 : ICONST_0); //auth
pushInt(mv, entry.cacheseconds); //cacheseconds
mv.visitTypeInsn(NEW, newDynName + "$" + entry.newActionClassName);
mv.visitInsn(DUP);
mv.visitVarInsn(ALOAD, 0);
mv.visitMethodInsn(INVOKESPECIAL, newDynName + "$" + entry.newActionClassName, "<init>", "(L" + newDynName + ";)V", false);
mv.visitMethodInsn(INVOKESPECIAL, innerEntryName, "<init>", "(IILjava/lang/String;[Ljava/lang/String;Ljava/lang/reflect/Method;ZILorg/redkale/net/http/HttpServlet;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/HashMap", "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", false);
mv.visitInsn(POP);
}
mv.visitVarInsn(ALOAD, 1);
mv.visitInsn(ARETURN);
mv.visitMaxs(2, 2);
mv.visitEnd();
}
for (Map.Entry<String, java.lang.reflect.Type> en : bodyTypes.entrySet()) {
fv = cw.visitField(ACC_PRIVATE, en.getKey(), "Ljava/lang/reflect/Type;", null, null);
fv.visitEnd();
@@ -1712,20 +1859,24 @@ public final class Rest {
fv.visitEnd();
}
//classMap.put("mappings", mappingMaps); //不显示太多信息
{ //toString函数
//classMap.put("mappings", mappingMaps); //不显示太多信息
{ //toString函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "toString", "()Ljava/lang/String;", null, null));
//mv.setDebug(true);
mv.visitLdcInsn(JsonConvert.root().convertTo(classMap));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_TOSTRINGOBJ_FIELD_NAME, "Ljava/util/function/Supplier;");
mv.visitMethodInsn(INVOKEINTERFACE, "java/util/function/Supplier", "get", "()Ljava/lang/Object;", true);
mv.visitTypeInsn(CHECKCAST, "java/lang/String");
mv.visitInsn(ARETURN);
mv.visitMaxs(1, 1);
mv.visitEnd();
}
cw.visitEnd();
Class<?> newClazz = new RestClassLoader(loader).loadClass(newDynName.replace('/', '.'), cw.toByteArray());
newLoader.addClass(newDynName.replace('/', '.'), cw.toByteArray());
try {
Class<?> newClazz = newLoader.findClass(newDynName.replace('/', '.'));
T obj = ((Class<T>) newClazz).getDeclaredConstructor().newInstance();
for (Map.Entry<String, org.redkale.util.Attribute> en : restAttributes.entrySet()) {
Field attrField = newClazz.getDeclaredField(en.getKey());
@@ -1740,7 +1891,9 @@ public final class Rest {
for (int i = 0; i < restConverts.size(); i++) {
Field genField = newClazz.getDeclaredField(REST_JSONCONVERT_FIELD_PREFIX + (i + 1));
genField.setAccessible(true);
genField.set(obj, createJsonConvert(restConverts.get(i)));
Object[] rc = restConverts.get(i);
genField.set(obj, createJsonConvert((RestConvert[]) rc[0], (RestConvertCoder[]) rc[1]));
}
Field typesfield = newClazz.getDeclaredField(REST_PARAMTYPES_FIELD_NAME);
typesfield.setAccessible(true);
@@ -1748,8 +1901,18 @@ public final class Rest {
paramtypeArray = paramtypes.toArray(paramtypeArray);
typesfield.set(obj, paramtypeArray);
Field tostringfield = newClazz.getDeclaredField(REST_TOSTRINGOBJ_FIELD_NAME);
tostringfield.setAccessible(true);
java.util.function.Supplier<String> sSupplier = () -> JsonConvert.root().convertTo(classMap);
tostringfield.set(obj, sSupplier);
Method restactMethod = newClazz.getDeclaredMethod("_createRestInnerActionEntry");
restactMethod.setAccessible(true);
Field tmpentrysfield = HttpServlet.class.getDeclaredField("_tmpentrys");
tmpentrysfield.setAccessible(true);
tmpentrysfield.set(obj, restactMethod.invoke(obj));
return obj;
} catch (Exception e) {
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@@ -1791,13 +1954,26 @@ public final class Rest {
private static class RestClassLoader extends ClassLoader {
private Map<String, byte[]> classes = new HashMap<>();
public RestClassLoader(ClassLoader parent) {
super(parent);
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
byte[] classData = classes.get(name);
if (classData == null) return super.findClass(name);
return super.defineClass(name, classData, 0, classData.length);
}
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
public final void addClass(String name, byte[] b) {
classes.put(name, b);
}
}
private static class MappingEntry {
@@ -1839,6 +2015,8 @@ public final class Rest {
}
}
this.existsPound = pound;
this.newMethodName = this.name.replace('/', '$').replace('.', '_');
this.newActionClassName = "_Dyn_" + this.newMethodName + "_ActionHttpServlet";
}
public final int methodidx; // _paramtypes 的下标从0开始
@@ -1847,6 +2025,10 @@ public final class Rest {
public final boolean ignore;
public final String newMethodName;
public final String newActionClassName;
public final String name;
public final String comment;
@@ -1861,6 +2043,8 @@ public final class Rest {
public final boolean existsPound; //是否包含#的参数
String mappingurl; //在生成方法时赋值, 供_createRestInnerActionEntry使用
@RestMapping()
void mapping() { //用于获取Mapping 默认值
}

View File

@@ -0,0 +1,43 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import org.redkale.convert.*;
/**
* 指定class某个字段的自定义序列化和反序列化策略。 <br>
* 只能依附在Service实现类的public方法上, 当方法的返回值以JSON输出时对指定类型的转换设定。 <br>
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@Inherited
@Documented
@Target({METHOD})
@Retention(RUNTIME)
@Repeatable(RestConvertCoder.RestConvertCoders.class)
public @interface RestConvertCoder {
Class type();
String field();
Class<? extends SimpledCoder> coder();
@Inherited
@Documented
@Target({METHOD})
@Retention(RUNTIME)
@interface RestConvertCoders {
RestConvertCoder[] value();
}
}

View File

@@ -475,6 +475,35 @@ public abstract class WebSocket<G extends Serializable, T> {
return rs;
}
/**
* 给指定userid的WebSocket节点发送操作
*
* @param action 操作参数
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendAction(final WebSocketAction action, Serializable... userids) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
CompletableFuture<Integer> rs = _engine.node.sendAction(action, userids);
if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket action(" + action + ")");
return rs;
}
/**
* 广播操作, 给所有人发操作指令
*
* @param action 操作参数
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
CompletableFuture<Integer> rs = _engine.node.broadcastAction(action);
if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("broadcast send websocket action(" + action + ")");
return rs;
}
/**
* 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br>
* InetSocketAddress 为 SNCP节点地址
@@ -683,6 +712,18 @@ public abstract class WebSocket<G extends Serializable, T> {
return true;
}
/**
* WebSocket.broadcastAction时的操作
*
* @param action 操作参数
*
* @return CompletableFuture
*
*/
protected CompletableFuture<Integer> action(WebSocketAction action) {
return CompletableFuture.completedFuture(0);
}
/**
* WebSokcet连接成功后的回调方法
*/

View File

@@ -0,0 +1,58 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net.http;
import java.io.Serializable;
import java.util.Map;
import org.redkale.convert.json.JsonConvert;
/**
* WebSocket.broadcastAction时的参数
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class WebSocketAction implements Serializable {
protected String action;
protected Map<String, String> attach;
public WebSocketAction() {
}
public WebSocketAction(String action) {
this.action = action;
}
public WebSocketAction(String action, Map<String, String> attach) {
this.action = action;
this.attach = attach;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Map<String, String> getAttach() {
return attach;
}
public void setAttach(Map<String, String> attach) {
this.attach = attach;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -325,6 +325,54 @@ public class WebSocketEngine {
}
}
@Comment("给指定WebSocket连接用户发起操作指令")
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
}
}
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
@Comment("给指定用户组发送操作")
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Stream<? extends Serializable> userids) {
Object[] array = userids.toArray();
Serializable[] ss = new Serializable[array.length];
for (int i = 0; i < array.length; i++) {
ss[i] = (Serializable) array[i];
}
return sendAction(action, ss);
}
@Comment("给指定用户组发送操作")
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
CompletableFuture<Integer> future = null;
if (single) {
for (Serializable userid : userids) {
WebSocket websocket = websockets.get(userid);
if (websocket == null) continue;
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
}
} else {
for (Serializable userid : userids) {
List<WebSocket> list = websockets2.get(userid);
if (list == null) continue;
for (WebSocket websocket : list) {
future = future == null ? websocket.action(action) : future.thenCombine(websocket.action(action), (a, b) -> a | (Integer) b);
}
}
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
@Comment("获取最大连接数")
public int getLocalWsmaxconns() {
return this.wsmaxconns;

View File

@@ -60,7 +60,9 @@ public abstract class WebSocketNode {
protected Semaphore semaphore;
public void init(AnyValue conf) {
if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
if (sncpNodeAddresses != null && "memory".equals(sncpNodeAddresses.getType())) {
sncpNodeAddresses.initValueType(InetSocketAddress.class);
}
if (localEngine != null) {
int wsthreads = localEngine.wsthreads;
if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8;
@@ -82,7 +84,7 @@ public abstract class WebSocketNode {
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress);
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, localSncpAddress);
}
}
@@ -92,6 +94,10 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable userid);
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr);
@@ -147,7 +153,7 @@ public abstract class WebSocketNode {
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (this.sncpNodeAddresses != null) {
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
return result;
}
@@ -231,7 +237,7 @@ public abstract class WebSocketNode {
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
@@ -509,7 +515,7 @@ public abstract class WebSocketNode {
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
@@ -540,7 +546,7 @@ public abstract class WebSocketNode {
//远程节点发送消息
final Object remoteMessage = formatRemoteMessage(message);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
@@ -559,6 +565,90 @@ public abstract class WebSocketNode {
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
/**
* 广播操作, 给所有人发操作
*
* @param action 操作参数
*
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.broadcastAction(action);
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastAction(addr, action)
: future.thenCombine(remoteNode.broadcastAction(addr, action), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
/**
* 向指定用户发送操作,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param action 操作参数
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
return this.localEngine.sendAction(action, userids);
}
CompletableFuture<Integer> future = null;
for (Serializable userid : userids) {
future = future == null ? sendOneAction(action, userid) : future.thenCombine(sendOneAction(action, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}
protected CompletableFuture<Integer> sendOneAction(final WebSocketAction action, final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendAction(action, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
}
//远程节点发送操作
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendAction(addr, action, userid)
: future.thenCombine(remoteNode.sendAction(addr, action, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
protected Object formatRemoteMessage(Object message) {
if (message instanceof WebSocketPacket) return message;
if (message instanceof byte[]) return message;

View File

@@ -129,7 +129,7 @@ public class RetResult<T> {
*/
public RetResult<T> attach(String key, Object value) {
if (this.attach == null) this.attach = new HashMap<>();
boolean canstr = value != null && (value instanceof CharSequence || value.getClass().isPrimitive());
boolean canstr = value != null && (value instanceof CharSequence || value instanceof Number || value.getClass().isPrimitive());
this.attach.put(key, value == null ? null : (canstr ? String.valueOf(value) : JsonConvert.root().convertTo(value)));
return this;
}

View File

@@ -67,6 +67,18 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return this.localEngine.broadcastMessage(wsrange, message, last);
}
@Override
public CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable userid) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendAction(action, userid);
}
@Override
public CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastAction(action);
}
/**
* 当用户连接到节点需要更新到CacheSource
*
@@ -78,8 +90,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, sncpAddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future;
@@ -96,7 +108,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
return future;
@@ -114,8 +126,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr));
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, InetSocketAddress.class, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, InetSocketAddress.class, sncpAddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
return future;

View File

@@ -27,9 +27,9 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
@SuppressWarnings("unchecked")
@Local
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(CacheSource.class)
public class CacheMemorySource<V extends Object> extends AbstractService implements CacheSource<V>, Service, AutoCloseable, Resourcable {
@@ -340,6 +340,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return (V) entry.objectValue;
}
@Override
public <T> T get(final String key, final Type type) {
return (T) get(key);
}
@Override
public String getString(String key) {
if (key == null) return null;
@@ -361,6 +366,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.supplyAsync(() -> get(key), getExecutor());
}
@Override
public <T> CompletableFuture<T> getAsync(final String key, final Type type) {
return CompletableFuture.supplyAsync(() -> (T) get(key), getExecutor());
}
@Override
public CompletableFuture<String> getStringAsync(final String key) {
return CompletableFuture.supplyAsync(() -> getString(key), getExecutor());
@@ -385,6 +395,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return (V) entry.objectValue;
}
@Override
public <T> T getAndRefresh(final String key, final int expireSeconds, final Type type) {
return (T) getAndRefresh(key, expireSeconds);
}
@Override
@RpcMultiRun
@SuppressWarnings("unchecked")
@@ -415,6 +430,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor());
}
@Override
public <T> CompletableFuture<T> getAndRefreshAsync(final String key, final int expireSeconds, final Type type) {
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds, type), getExecutor());
}
@Override
@RpcMultiRun
public CompletableFuture<String> getStringAndRefreshAsync(final String key, final int expireSeconds) {
@@ -462,6 +482,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
set(CacheEntryType.OBJECT, key, value);
}
@Override
public <T> void set(String key, Type type, T value) {
set(CacheEntryType.OBJECT, key, value);
}
@Override
@RpcMultiRun
public void setString(String key, String value) {
@@ -480,6 +505,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(String key, Type type, T value) {
return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setStringAsync(String key, String value) {
@@ -511,6 +541,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
public <T> void set(final int expireSeconds, String key, Type type, T value) {
set(CacheEntryType.OBJECT, expireSeconds, key, value);
}
@Override
@RpcMultiRun
public void setString(int expireSeconds, String key, String value) {
@@ -529,6 +564,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> setAsync(int expireSeconds, String key, Type type, T value) {
return CompletableFuture.runAsync(() -> set(expireSeconds, key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> setStringAsync(int expireSeconds, String key, String value) {
@@ -632,6 +672,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return (Collection<V>) get(key);
}
@Override
public <T> Collection<T> getCollection(final String key, final Type componentType) {
return (Collection<T>) get(key);
}
@Override
public Collection<String> getStringCollection(final String key) {
return (Collection<String>) get(key);
@@ -647,6 +692,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor());
}
@Override
public CompletableFuture<Collection<V>> getCollectionAsync(final String key, final Type componentType) {
return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor());
}
@Override
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key) {
return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor());
@@ -674,6 +724,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return (Collection<V>) getAndRefresh(key, expireSeconds);
}
@Override
public <T> Collection<T> getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType) {
return (Collection<T>) getAndRefresh(key, expireSeconds, componentType);
}
@Override
@RpcMultiRun
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds) {
@@ -686,11 +741,22 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return list != null && list.contains(value);
}
@Override
public <T> boolean existsSetItem(final String key, final Type type, final T value) {
Collection list = getCollection(key);
return list != null && list.contains(value);
}
@Override
public CompletableFuture<Boolean> existsSetItemAsync(final String key, final V value) {
return CompletableFuture.supplyAsync(() -> existsSetItem(key, value), getExecutor());
}
@Override
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type type, final T value) {
return CompletableFuture.supplyAsync(() -> existsSetItem(key, type, value), getExecutor());
}
@Override
public boolean existsStringSetItem(final String key, final String value) {
Collection<String> list = getStringCollection(key);
@@ -725,6 +791,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor());
}
@Override
public <T> CompletableFuture<Collection<T>> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType) {
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds, componentType), getExecutor());
}
@Override
@RpcMultiRun
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) {
@@ -757,6 +828,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
}
@Override
public <T> void appendListItem(String key, Type componentType, T value) {
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
}
@Override
@RpcMultiRun
public void appendStringListItem(String key, String value) {
@@ -775,6 +851,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> appendListItemAsync(final String key, final Type componentType, final T value) {
return CompletableFuture.runAsync(() -> appendListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value) {
@@ -796,6 +877,14 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
entry.listValue.remove(value);
}
@Override
public <T> void removeListItem(String key, final Type componentType, T value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || entry.listValue == null) return;
entry.listValue.remove(value);
}
@Override
@RpcMultiRun
public void removeStringListItem(String key, String value) {
@@ -820,6 +909,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> removeListItemAsync(final String key, final Type componentType, T value) {
return CompletableFuture.runAsync(() -> removeListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value) {
@@ -852,6 +946,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
}
@Override
public <T> void appendSetItem(String key, final Type componentType, T value) {
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
}
@Override
@RpcMultiRun
public void appendStringSetItem(String key, String value) {
@@ -870,6 +969,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, T value) {
return CompletableFuture.runAsync(() -> appendSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value) {
@@ -891,6 +995,14 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
entry.csetValue.remove(value);
}
@Override
public <T> void removeSetItem(String key, Type type, T value) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null || entry.csetValue == null) return;
entry.csetValue.remove(value);
}
@Override
@RpcMultiRun
public void removeStringSetItem(String key, String value) {
@@ -915,6 +1027,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Void> removeSetItemAsync(final String key, final Type componentType, final T value) {
return CompletableFuture.runAsync(() -> removeSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
@RpcMultiRun
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value) {

View File

@@ -42,6 +42,8 @@ public interface CacheSource<V extends Object> {
public V get(final String key);
public <T> T get(final String key, final Type type);
default V getIfAbsent(final String key, Function<String, ? extends V> mappingFunction) {
V rs = get(key);
if (rs == null) {
@@ -53,6 +55,8 @@ public interface CacheSource<V extends Object> {
public V getAndRefresh(final String key, final int expireSeconds);
public <T> T getAndRefresh(final String key, final int expireSeconds, final Type type);
default V getAndRefreshIfAbsent(final String key, final int expireSeconds, Function<String, ? extends V> mappingFunction) {
V rs = getAndRefresh(key, expireSeconds);
if (rs == null) {
@@ -66,8 +70,12 @@ public interface CacheSource<V extends Object> {
public void set(final String key, final V value);
public <T> void set(final String key, final Type type, final T value);
public void set(final int expireSeconds, final String key, final V value);
public <T> void set(final int expireSeconds, final String key, final Type type, final T value);
public void setExpireSeconds(final String key, final int expireSeconds);
public void remove(final String key);
@@ -82,10 +90,14 @@ public interface CacheSource<V extends Object> {
public Collection<V> getCollection(final String key);
public <T> Collection<T> getCollection(final String key, final Type componentType);
public int getCollectionSize(final String key);
public Collection<V> getCollectionAndRefresh(final String key, final int expireSeconds);
public <T> Collection<T> getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType);
public void appendListItem(final String key, final V value);
public void removeListItem(final String key, final V value);
@@ -96,6 +108,16 @@ public interface CacheSource<V extends Object> {
public void removeSetItem(final String key, final V value);
public <T> void appendListItem(final String key, final Type componentType, final T value);
public <T> void removeListItem(final String key, final Type componentType, final T value);
public <T> boolean existsSetItem(final String key, final Type componentType, final T value);
public <T> void appendSetItem(final String key, final Type componentType, final T value);
public <T> void removeSetItem(final String key, final Type componentType, final T value);
public List<String> queryKeys();
public List<String> queryKeysStartsWith(String startsWith);
@@ -153,6 +175,8 @@ public interface CacheSource<V extends Object> {
//---------------------- CompletableFuture 异步版 ---------------------------------
public CompletableFuture<Boolean> existsAsync(final String key);
public <T> CompletableFuture<T> getAsync(final String key, final Type type);
public CompletableFuture<V> getAsync(final String key);
default CompletableFuture<V> getIfAbsentAsync(final String key, Function<String, ? extends V> mappingFunction) {
@@ -170,6 +194,8 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<V> getAndRefreshAsync(final String key, final int expireSeconds);
public <T> CompletableFuture<T> getAndRefreshAsync(final String key, final int expireSeconds, final Type type);
default CompletableFuture<V> getAndRefreshIfAbsentAsync(final String key, final int expireSeconds, Function<String, ? extends V> mappingFunction) {
return getAndRefreshAsync(key, expireSeconds).thenCompose((V rs) -> {
if (rs == null) {
@@ -187,8 +213,12 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> setAsync(final String key, final V value);
public <T> CompletableFuture<Void> setAsync(final String key, final Type type, final T value);
public CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final V value);
public <T> CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final Type type, final T value);
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds);
public CompletableFuture<Void> removeAsync(final String key);
@@ -203,10 +233,14 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Collection<V>> getCollectionAsync(final String key);
public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType);
public CompletableFuture<Integer> getCollectionSizeAsync(final String key);
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final String key, final int expireSeconds);
public <T> CompletableFuture<Collection<T>> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType);
public CompletableFuture<Void> appendListItemAsync(final String key, final V value);
public CompletableFuture<Void> removeListItemAsync(final String key, final V value);
@@ -217,6 +251,16 @@ public interface CacheSource<V extends Object> {
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value);
public <T> CompletableFuture<Void> appendListItemAsync(final String key, final Type componentType, final T value);
public <T> CompletableFuture<Void> removeListItemAsync(final String key, final Type componentType, final T value);
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type componentType, final T value);
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, final T value);
public <T> CompletableFuture<Void> removeSetItemAsync(final String key, final Type componentType, final T value);
public CompletableFuture<List<String>> queryKeysAsync();
public CompletableFuture<List<String>> queryKeysStartsWithAsync(String startsWith);

View File

@@ -86,7 +86,7 @@ public class FilterJoinNode extends FilterNode {
protected FilterNode any(final FilterNode node0, boolean signor) {
Objects.requireNonNull(node0);
if (!(node0 instanceof FilterJoinNode)) {
throw new IllegalArgumentException(this + (signor ? " or " : " and ") + " a node but " + String.valueOf(node0) + "is not a " + FilterJoinNode.class.getSimpleName());
throw new IllegalArgumentException(this + (signor ? " or " : " and ") + " a node but " + String.valueOf(node0) + " is not a " + FilterJoinNode.class.getSimpleName());
}
final FilterJoinNode node = (FilterJoinNode) node0;
if (this.nodes == null) {
@@ -99,7 +99,7 @@ public class FilterJoinNode extends FilterNode {
if (this.column == null) this.or = signor;
return this;
}
this.nodes = new FilterNode[]{new FilterJoinNode(node), node};
this.nodes = new FilterNode[]{new FilterJoinNode(this), node};
this.column = null;
this.express = null;
this.itemand = true;

View File

@@ -25,6 +25,8 @@ import org.redkale.util.*;
*/
public class FilterNode { //FilterNode 不能实现Serializable接口 否则DataSource很多重载接口会出现冲突
protected boolean readOnly;
protected String column;
protected FilterExpress express;
@@ -80,6 +82,16 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
this.value = val;
}
public FilterNode asReadOnly() {
this.readOnly = true;
return this;
}
public FilterNode readOnly(boolean readOnly) {
this.readOnly = readOnly;
return this;
}
public long findLongValue(final String col, long defValue) {
Serializable val = findValue(col);
return val == null ? defValue : ((Number) val).longValue();
@@ -95,7 +107,7 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
}
public Serializable findValue(final String col) {
if (this.column.equals(col)) return this.value;
if (this.column != null && this.column.equals(col)) return this.value;
if (this.nodes == null) return null;
for (FilterNode n : this.nodes) {
if (n == null) continue;
@@ -138,6 +150,7 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
}
protected FilterNode any(FilterNode node, boolean signor) {
if (this.readOnly) throw new RuntimeException("FilterNode(" + this + ") is ReadOnly");
Objects.requireNonNull(node);
if (this.column == null) {
this.column = node.column;
@@ -1861,6 +1874,14 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
this.value = value;
}
public boolean isReadOnly() {
return readOnly;
}
public void setReadOnly(boolean readOnly) {
this.readOnly = readOnly;
}
public final boolean isOr() {
return or;
}

View File

@@ -86,6 +86,8 @@ public class PoolJdbcSource extends PoolSource<Connection> {
source0 = "org.postgresql.Driver";
} else if (url.startsWith("jdbc:microsoft:sqlserver:")) {
source0 = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
} else if (url.startsWith("jdbc:h2")) {
source0 = "org.h2.Driver";
}
}
if (source0 != null && source0.contains("Driver")) { //为了兼容JPA的配置文件
@@ -111,6 +113,9 @@ public class PoolJdbcSource extends PoolSource<Connection> {
case "com.microsoft.sqlserver.jdbc.SQLServerDriver":
source = "com.microsoft.sqlserver.jdbc.SQLServerConnectionPoolDataSource";
break;
case "org.h2.Driver":
source = "org.h2.jdbcx.JdbcDataSource";
break;
}
}
final Class clazz = Thread.currentThread().getContextClassLoader().loadClass(source);

View File

@@ -123,6 +123,7 @@ public abstract class PoolSource<DBChannel> {
}
protected void parseAddressAndDbnameAndAttrs() {
if (this.url.startsWith("jdbc:h2:")) return;
String url0 = this.url.substring(this.url.indexOf("://") + 3);
int pos = url0.indexOf('?'); //127.0.0.1:5432/db?charset=utr8&xxx=yy
if (pos > 0) {

View File

@@ -63,6 +63,22 @@ public class ByteBufferReader {
return new ByteBufferReader(buffers);
}
public static byte[] toBytes(ByteBuffer[] buffers) {
if (buffers == null) return null;
int size = 0;
for (ByteBuffer buffer : buffers) {
size += buffer.remaining();
}
byte[] bs = new byte[size];
int index = 0;
for (ByteBuffer buffer : buffers) {
int remain = buffer.remaining();
buffer.get(bs, index, remain);
index += remain;
}
return bs;
}
public boolean hasRemaining() {
return this.currBuffer.hasRemaining();
}

View File

@@ -34,6 +34,14 @@ public class ByteBufferWriter {
return new ByteBufferWriter(supplier);
}
public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content) {
return new ByteBufferWriter(supplier).put(content, 0, content.length).toBuffers();
}
public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content, int offset, int length) {
return new ByteBufferWriter(supplier).put(content, offset, length).toBuffers();
}
private ByteBuffer getLastBuffer(int size) {
if (this.buffers == null) {
ByteBuffer buf = supplier.get();
@@ -173,19 +181,4 @@ public class ByteBufferWriter {
return this;
}
public static byte[] toBytes(ByteBuffer[] buffers) {
if (buffers == null) return null;
int size = 0;
for (ByteBuffer buffer : buffers) {
size += buffer.remaining();
}
byte[] bs = new byte[size];
int index = 0;
for (ByteBuffer buffer : buffers) {
int remain = buffer.remaining();
buffer.get(bs, index, remain);
index += remain;
}
return bs;
}
}

View File

@@ -77,6 +77,7 @@ public interface Creator<T> {
static final Map<Class, Creator> creatorCacheMap = new HashMap<>();
static {
creatorCacheMap.put(Object.class, (params) -> new Object());
creatorCacheMap.put(ArrayList.class, (params) -> new ArrayList<>());
creatorCacheMap.put(HashMap.class, (params) -> new HashMap<>());
creatorCacheMap.put(HashSet.class, (params) -> new HashSet<>());
@@ -212,14 +213,16 @@ public interface Creator<T> {
*/
@SuppressWarnings("unchecked")
public static <T> Creator<T> create(Class<T> clazz) {
if (clazz.isAssignableFrom(ArrayList.class)) {
if (List.class.isAssignableFrom(clazz) && (clazz.isAssignableFrom(ArrayList.class) || clazz.getName().startsWith("java.util.Collections") || clazz.getName().startsWith("java.util.ImmutableCollections") || clazz.getName().startsWith("java.util.Arrays"))) {
clazz = (Class<T>) ArrayList.class;
} else if (clazz.isAssignableFrom(HashMap.class)) {
} else if (Map.class.isAssignableFrom(clazz) && (clazz.isAssignableFrom(HashMap.class) || clazz.getName().startsWith("java.util.Collections") || clazz.getName().startsWith("java.util.ImmutableCollections"))) {
clazz = (Class<T>) HashMap.class;
} else if (clazz.isAssignableFrom(HashSet.class)) {
} else if (Set.class.isAssignableFrom(clazz) && (clazz.isAssignableFrom(HashSet.class) || clazz.getName().startsWith("java.util.Collections") || clazz.getName().startsWith("java.util.ImmutableCollections"))) {
clazz = (Class<T>) HashSet.class;
} else if (clazz.isAssignableFrom(ConcurrentHashMap.class)) {
} else if (Map.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(ConcurrentHashMap.class)) {
clazz = (Class<T>) ConcurrentHashMap.class;
} else if (Collection.class.isAssignableFrom(clazz) && clazz.isAssignableFrom(ArrayList.class)) {
clazz = (Class<T>) ArrayList.class;
}
Creator creator = CreatorInner.creatorCacheMap.get(clazz);
if (creator != null) return creator;

View File

@@ -53,6 +53,14 @@ public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
this(null, null, max, creator, prepare, recycler);
}
public ObjectPool(int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, prepare, recycler);
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Supplier<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(creatCounter, cycleCounter, max, c -> creator.get(), prepare, recycler);
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.9.6";
return "1.9.7";
}
public static int getMajorVersion() {

View File

@@ -434,6 +434,21 @@ public final class ResourceFactory {
return map == null ? ((recursive && parent != null) ? parent.contains(recursive, name, clazz) : false) : map.containsKey(name);
}
/**
* 查找指定资源名和资源类型的资源对象所在的ResourceFactory 没有则返回null
*
* @param name 资源名
* @param clazz 资源类型
*
* @return ResourceFactory
*/
public ResourceFactory findResourceFactory(String name, Type clazz) {
Map<String, ResourceEntry> map = this.store.get(clazz);
if (map != null && map.containsKey(name)) return this;
if (parent != null) return parent.findResourceFactory(name, clazz);
return null;
}
public <A> A find(Class<? extends A> clazz) {
return find("", clazz);
}
@@ -538,7 +553,7 @@ public final class ResourceFactory {
if (pos < 0) return name;
String prefix = name.substring(0, pos);
String subname = name.substring(pos + "{system.property.".length());
pos = subname.indexOf('}');
pos = subname.lastIndexOf('}');
if (pos < 0) return name;
String postfix = subname.substring(pos + 1);
String property = subname.substring(0, pos);

View File

@@ -77,6 +77,7 @@ public class SelectColumn implements Predicate<String> {
*
* @return SelectColumn
*/
@Deprecated
public static SelectColumn createIncludes(String... columns) {
return new SelectColumn(columns, false);
}
@@ -101,6 +102,7 @@ public class SelectColumn implements Predicate<String> {
*
* @return SelectColumn
*/
@Deprecated
public static SelectColumn createIncludes(String[] cols, String... columns) {
return new SelectColumn(Utility.append(cols, columns), false);
}
@@ -126,6 +128,7 @@ public class SelectColumn implements Predicate<String> {
*
* @return SelectColumn
*/
@Deprecated
public static SelectColumn createExcludes(String... columns) {
return new SelectColumn(columns, true);
}
@@ -150,6 +153,7 @@ public class SelectColumn implements Predicate<String> {
*
* @return SelectColumn
*/
@Deprecated
public static SelectColumn createExcludes(String[] cols, String... columns) {
return new SelectColumn(Utility.append(cols, columns), true);
}

View File

@@ -146,7 +146,7 @@ public final class Utility {
* @return Map
*/
public static Map<String, String> ofMap(String... items) {
HashMap<String, String> map = new LinkedHashMap<>();
HashMap<String, String> map = new LinkedHashMap<>(Math.max(1, items.length / 2));
int len = items.length / 2;
for (int i = 0; i < len; i++) {
map.put(items[i * 2], items[i * 2 + 1]);
@@ -165,7 +165,7 @@ public final class Utility {
* @return Map
*/
public static <K, V> Map<K, V> ofMap(Object... items) {
HashMap<K, V> map = new LinkedHashMap<>();
HashMap<K, V> map = new LinkedHashMap<>(Math.max(1, items.length / 2));
int len = items.length / 2;
for (int i = 0; i < len; i++) {
map.put((K) items[i * 2], (V) items[i * 2 + 1]);
@@ -203,7 +203,7 @@ public final class Utility {
* @return Set
*/
public static <T> Set<T> ofSet(T... items) {
Set<T> set = new LinkedHashSet<>();
Set<T> set = new LinkedHashSet<>(items.length);
for (T item : items) set.add(item);
return set;
}
@@ -217,7 +217,7 @@ public final class Utility {
* @return List
*/
public static <T> List<T> ofList(T... items) {
List<T> list = new ArrayList<>();
List<T> list = new ArrayList<>(items.length);
for (T item : items) list.add(item);
return list;
}
@@ -556,7 +556,7 @@ public final class Utility {
* @return 新数组
*/
public static <T> T[] remove(final T[] array, final T item) {
return remove(array, (i) -> item.equals(item));
return remove(array, (i) -> Objects.equals(i, item));
}
/**
@@ -677,8 +677,7 @@ public final class Utility {
public static <T> boolean contains(T[] values, T value) {
if (values == null) return false;
for (T v : values) {
if (v == null && value == null) return true;
if (v != null && v.equals(value)) return true;
if (Objects.equals(v, value)) return true;
}
return false;
}
@@ -1082,6 +1081,64 @@ public final class Utility {
return md5.digest(str.getBytes());
}
/**
* SHA-256
*
* @param bs 待hash数据
*
* @return hash值
*/
public static String sha256Hex(byte[] bs) {
return binToHexString(sha256Bytes(bs));
}
/**
* SHA-256
*
* @param bs 待hash数据
*
* @return hash值
*/
public static byte[] sha256Bytes(byte[] bs) {
if (bs == null) return null;
MessageDigest digester;
try {
digester = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
return digester.digest(bs);
}
/**
* SHA-256
*
* @param str 待hash数据
*
* @return hash值
*/
public static String sha256Hex(String str) {
return binToHexString(sha256Bytes(str));
}
/**
* SHA-256
*
* @param str 待hash数据
*
* @return hash值
*/
public static byte[] sha256Bytes(String str) {
if (str == null) return null;
MessageDigest digester;
try {
digester = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
return digester.digest(str.getBytes());
}
/**
* 将字节数组转换为16进制字符串
*

View File

@@ -0,0 +1,40 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.rest;
import java.text.SimpleDateFormat;
import org.redkale.convert.*;
/**
*
* @author zhangjx
* @param <R> R
* @param <W> W
*/
public class CreateTimeSimpleCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, Long> {
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void convertTo(W out, Long value) {
if (value == null) {
out.writeNull();
} else {
out.writeString(format.format(new java.util.Date(value)));
}
}
@Override
public Long convertFrom(R in) {
String val = in.readString();
if (val == null) return 0L;
try {
return format.parse(val).getTime();
} catch (Exception e) {
return 0L;
}
}
}

View File

@@ -20,7 +20,7 @@ import org.redkale.util.Sheet;
*
*/
@RestService(name = "hello", moduleid = 0, automapping = true, repair = true, ignore = false, comment = "Hello服务模块")
public class HelloService2 implements Service {
public class HelloService2 implements Service {
@Resource
private DataSource source;
@@ -47,7 +47,8 @@ public class HelloService2 implements Service {
source.update(entity);
}
//查询列表
//查询列表
@RestConvertCoder(type = HelloEntity.class, field = "createtime", coder = CreateTimeSimpleCoder.class)
@RestMapping(name = "query", auth = false, comment = "查询Hello对象列表")
public Sheet<HelloEntity> queryHello(@RestParam(name = "bean", comment = "过滤条件") HelloBean bean, Flipper flipper) { //通过 /hello/query/offset:0/limit:20?bean={...} 查询列表
return source.querySheet(HelloEntity.class, flipper, bean);

View File

@@ -8,6 +8,7 @@ package org.redkale.test.source;
import java.io.Serializable;
import javax.persistence.*;
import org.redkale.source.*;
import org.redkale.util.Utility;
/**
*
@@ -17,8 +18,8 @@ import org.redkale.source.*;
public class LoginRecord extends BaseEntity {
@Id
@Column(comment = "主键ID; 值=create36time(9位)+UUID(32位)")
private String loginid = ""; //主键ID; 值=create36time(9位)+UUID(32位)
@Column(comment = "主键ID; 值=create36time(9位)+'-'+UUID(32位)")
private String loginid = ""; //主键ID; 值=create36time(9位)+'-'+UUID(32位)
@Column(updatable = false, comment = "C端用户ID")
private long userid; //C端用户ID
@@ -107,18 +108,30 @@ public class LoginRecord extends BaseEntity {
return this.createtime;
}
private static DataSource source;
//创建对象
public static void main(String[] args) throws Throwable {
LoginRecord record = new LoginRecord();
long now = System.currentTimeMillis();
record.setCreatetime(now); //设置创建时间
record.setLoginid(Utility.format36time(now) + "-" + Utility.uuid()); //主键的生成规则
//.... 填充其他字段
source.insert(record);
}
public static class TableStrategy implements DistributeTableStrategy<LoginRecord> {
private static final String dayformat = "%1$tY%1$tm%1$td";
private static final String dayformat = "%1$tY%1$tm%1$td"; //一天一个表
private static final String yearformat = "%1$tY";
private static final String yearformat = "%1$tY"; //一年一个库
//过滤查询时调用本方法
@Override
public String getTable(String table, FilterNode node) {
Serializable day = node.findValue("#day");
Serializable day = node.findValue("#day"); //LoginRecord没有day字段所以前面要加#,表示虚拟字段, 值为yyyyMMdd格式
if (day != null) getTable(table, (Integer) day, 0L); //存在#day参数则直接使用day值
Serializable time = node.findValue("#createtime"); //存在createtime则使用最小时间且createtime的范围必须在一天内因为本表以天为单位建表
Serializable time = node.findValue("createtime"); //存在createtime则使用最小时间且createtime的范围必须在一天内因为本表以天为单位建表
return getTable(table, 0, (time == null ? 0L : (time instanceof Range ? ((Range.LongRange) time).getMin() : (Long) time)));
}
@@ -135,9 +148,9 @@ public class LoginRecord extends BaseEntity {
return getTable(table, 0, Long.parseLong(id.substring(0, 9), 36));
}
private String getTable(String table, int day, long createtime) {
private String getTable(String table, int day, long createtime) { //day为0或yyyyMMdd格式数据
int pos = table.indexOf('.');
String year = (day > 0 ? "" + day / 10000 : String.format(yearformat, createtime)); //没有day取createtime
String year = day > 0 ? String.valueOf(day / 10000) : String.format(yearformat, createtime); //没有day取createtime
return "platf_login_" + year + "." + table.substring(pos + 1) + "_" + (day > 0 ? day : String.format(dayformat, createtime));
}
}

View File

@@ -8,6 +8,8 @@ package org.redkale.test.ws;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.redkale.net.http.WebSocketNode;
import org.redkale.service.*;
import org.redkale.util.Comment;
@@ -25,6 +27,9 @@ public class ChatService implements Service {
protected final AtomicInteger idcreator = new AtomicInteger(10000);
@Resource(name = "chat")
protected WebSocketNode wsnode;
@Comment("创建一个用户ID")
public int createUserid() {
return idcreator.incrementAndGet();
@@ -34,6 +39,11 @@ public class ChatService implements Service {
public boolean joinRoom(int userid, int roomid) {
userToRooms.put(userid, roomid);
roomToUsers.computeIfAbsent(roomid, (id) -> new CopyOnWriteArrayList()).add(userid);
System.out.println("加入房间: roomid: " + roomid);
return true;
}
public void chatMessage(ChatMessage message) {
wsnode.broadcastMessage(message);
}
}

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.test.ws;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource;
@@ -72,7 +71,7 @@ public class ChatWebSocket extends WebSocket<Integer, Object> {
message.fromuserid = getUserid();
message.fromusername = "用户" + getUserid();
System.out.println("获取消息: message: " + message + ", map: " + extmap);
super.broadcastMessage(message);
service.chatMessage(message);
}
/**
@@ -90,16 +89,6 @@ public class ChatWebSocket extends WebSocket<Integer, Object> {
@RestOnMessage(name = "joinroom")
public void onJoinRoom(int roomid) {
service.joinRoom(getUserid(), roomid);
System.out.println("加入房间: roomid: " + roomid);
}
public static void main(String[] args) throws Throwable {
Method method = Arrays.asList(Rest.class.getDeclaredMethods())
.stream().filter(m -> "createRestWebSocketServlet".equals(m.getName()))
.findFirst().get();
method.setAccessible(true);
System.out.println(method.invoke(null, Thread.currentThread().getContextClassLoader(), ChatWebSocket.class));
}
}