58 Commits
1.8.6 ... 1.8.8

Author SHA1 Message Date
Redkale
921aedaf9d 增加Optional的序列化和反序列化 2017-12-25 10:12:01 +08:00
Redkale
0f545923b2 2017-12-23 10:39:46 +08:00
Redkale
5b32a91874 2017-12-23 10:14:27 +08:00
Redkale
25b2528416 2017-12-23 10:11:38 +08:00
Redkale
d29c95c38f 2017-12-22 10:48:31 +08:00
Redkale
a661ab2ff5 Utility增加max、min方法 2017-12-21 19:14:01 +08:00
Redkale
ac08ebee75 Utility增加joining方法 2017-12-21 17:39:45 +08:00
Redkale
8e1a287ed1 2017-12-21 17:33:18 +08:00
Redkale
9b656b3970 2017-12-21 16:50:21 +08:00
Redkale
a1c0bbf413 2017-12-21 16:24:08 +08:00
Redkale
2ad8c5d425 2017-12-21 16:19:41 +08:00
Redkale
75aaa980cf 2017-12-21 16:09:17 +08:00
Redkale
7c55326d23 2017-12-20 18:49:47 +08:00
Redkale
aa3ade5912 Convert增加AtomicIntegerSimpledCoder 2017-12-20 18:45:14 +08:00
Redkale
c692deebe9 优化MapEncoder 2017-12-20 14:20:02 +08:00
Redkale
9ff161c97d 增加WebSocket在onOpen和createUserid方法里返回无效用户信息前也可以发送信息 2017-12-20 08:54:54 +08:00
Redkale
5b1f820621 2017-12-19 20:07:57 +08:00
Redkale
30b2cffcb8 2017-12-19 16:40:58 +08:00
Redkale
709439bfca 2017-12-18 11:48:36 +08:00
Redkale
34adb238f7 ObjectPool实现Consumer接口,且将offer设为过期,建议使用accept方法 2017-12-18 10:00:18 +08:00
Redkale
76c54f8d54 修复部分带有@RestConvert方法生成RestServlet时出现的BUG 2017-12-16 16:32:21 +08:00
Redkale
6196c05f12 2017-12-16 10:42:50 +08:00
Redkale
37df0af56c Redkale 1.8.8 开始 2017-12-16 09:40:30 +08:00
Redkale
8a5e1252ab 2017-12-15 09:06:26 +08:00
Redkale
ae5430af42 2017-12-14 19:25:10 +08:00
Redkale
13cf188e25 WebSocketEngine增加forEachLocalWebSocket方法 2017-12-14 19:10:49 +08:00
Redkale
41d3dea1ac 2017-12-14 18:43:23 +08:00
Redkale
be816088f5 增加对IntStream、LongStream、DoubleStream的序列化支持 2017-12-14 18:37:54 +08:00
Redkale
5bef900b76 2017-12-14 16:01:13 +08:00
Redkale
52f61b0f96 2017-12-14 15:56:31 +08:00
Redkale
b5a4646e3b 2017-12-14 15:50:48 +08:00
Redkale
d055d5c824 2017-12-14 15:09:20 +08:00
Redkale
f14ef05c88 HttpResponse增加finish(Object)系列方法 2017-12-14 15:02:16 +08:00
Redkale
426506324f 2017-12-14 10:58:27 +08:00
Redkale
7a5e58a112 删掉AsyncHandler, 采用CompletionHandler代替 2017-12-14 10:42:24 +08:00
Redkale
2e0c58cbea WebSocket增加changeUserid功能 2017-12-12 09:33:56 +08:00
Redkale
9ded3fbb9a Update README.md 2017-12-11 18:51:48 +08:00
Redkale
acbc1032e6 Update README.md 2017-12-11 18:50:59 +08:00
Redkale
d8186b00ba 启动打印java.version信息 2017-12-11 11:39:57 +08:00
Redkale
767adcbefe 2017-12-08 19:44:39 +08:00
Redkale
67df072275 HttpRequest增加getParametersToString方法 2017-12-08 19:38:54 +08:00
Redkale
dab70af4d4 2017-12-08 14:27:22 +08:00
Redkale
b42826692d 2017-12-08 11:50:12 +08:00
Redkale
e2ab4b20c9 HttpResult支持byte[]、ByteBuffer和ByteBuffer[] 2017-12-08 11:36:24 +08:00
Redkale
511ee8a6df EntityInfo增加getQueryColumns方法 2017-12-06 18:58:30 +08:00
Redkale
463269a796 增加ApplicationListener功能 2017-12-06 14:33:39 +08:00
Redkale
2d4b865432 RestWebSocket增加anyuser功能 2017-12-06 10:20:13 +08:00
Redkale
1da73429f7 对SNCP的Address进行IPv4判断 2017-12-04 09:48:52 +08:00
Redkale
e97e6b8262 2017-11-29 14:06:01 +08:00
Redkale
e6bc34d6f8 2017-11-29 14:03:53 +08:00
Redkale
f1c4ac9e67 2017-11-29 14:01:57 +08:00
Redkale
de6c6076e4 【新增功能】DataSource可以通过application.xml配置 2017-11-29 13:56:26 +08:00
Redkale
a36e3d3819 2017-11-29 12:05:18 +08:00
Redkale
043b847f05 DataSource增加getType()方法 2017-11-29 12:01:42 +08:00
Redkale
65bc8192f0 DataJdbcSource增加preConstruct方法,方便重载 2017-11-29 11:28:10 +08:00
Redkale
71e0b60200 增加manifest的MimeType 2017-11-28 20:03:29 +08:00
Redkale
99dc7ac189 2017-11-25 16:48:59 +08:00
Redkale
8605c44f14 Redkale 1.8.7 开始 2017-11-25 14:40:39 +08:00
62 changed files with 1278 additions and 413 deletions

View File

@@ -25,4 +25,5 @@
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<h5>基本文档:&nbsp;&nbsp;&nbsp;&nbsp;<a href='https://redkale.org/articles.html' target='_blank'>https://redkale.org/articles.html</a></h5> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<h5>基本文档:&nbsp;&nbsp;&nbsp;&nbsp;<a href='https://redkale.org/articles.html' target='_blank'>https://redkale.org/articles.html</a></h5>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<h5>欢迎加入Redkale QQ群: 527523235</h5>
&nbsp; &nbsp;

View File

@@ -72,6 +72,12 @@
<node addr="127.0.0.1" port="7070"/> <node addr="127.0.0.1" port="7070"/>
</source> </source>
<!--
Application启动的监听事件,可配置多个节点
value: 类名必须是ApplicationListener的子类
-->
<listener value="org.redkalex.xxx.XXXApplicationListener"/>
<!-- <!--
【节点全局唯一】 【节点全局唯一】
全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入<property>的信息, 被注解的字段类型只能是String、primitive class 全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入<property>的信息, 被注解的字段类型只能是String、primitive class

View File

@@ -8,6 +8,7 @@ ClassReader.java
ClassVisitor.java ClassVisitor.java
ClassWriter.java ClassWriter.java
Context.java Context.java
CurrentFrame.java
Edge.java Edge.java
FieldVisitor.java FieldVisitor.java
FieldWriter.java FieldWriter.java
@@ -18,25 +19,8 @@ Item.java
Label.java Label.java
MethodVisitor.java MethodVisitor.java
MethodWriter.java MethodWriter.java
ModuleVisitor.java
ModuleWriter.java
Opcodes.java Opcodes.java
Type.java Type.java
TypePath.java TypePath.java
public static void main(String[] args) throws Throwable {
File srcasmroot = new File("D:/JAVA/JDK源码/JDK9源码/java.base/jdk/internal/org/objectweb/asm");
File destasmroot = new File("D:/Java-Projects/RedkaleProject/src/org/redkale/asm");
String line = null;
LineNumberReader txtin = new LineNumberReader(new FileReader(new File(destasmroot, "asm.txt")));
while ((line = txtin.readLine()) != null) {
line = line.trim();
if (!line.endsWith(".java")) continue;
File srcfile = new File(srcasmroot, line);
File destfile = new File(destasmroot, line);
String content = Utility.readThenClose(new FileInputStream(srcfile));
FileOutputStream out = new FileOutputStream(destfile);
out.write(content.replace("jdk.internal.org.objectweb", "org.redkale").getBytes());
out.close();
}
}

View File

@@ -131,6 +131,9 @@ public final class Application {
//日志 //日志
private final Logger logger; private final Logger logger;
//监听事件
private final List<ApplicationListener> listeners = new CopyOnWriteArrayList<>();
//服务启动时间 //服务启动时间
private final long startTime = System.currentTimeMillis(); private final long startTime = System.currentTimeMillis();
@@ -233,7 +236,7 @@ public final class Application {
this.logger = Logger.getLogger(this.getClass().getSimpleName()); this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1); this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader()); this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------"); logger.log(Level.INFO, "------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------");
//------------------配置 <transport> 节点 ------------------ //------------------配置 <transport> 节点 ------------------
ObjectPool<ByteBuffer> transportPool = null; ObjectPool<ByteBuffer> transportPool = null;
ExecutorService transportExec = null; ExecutorService transportExec = null;
@@ -354,7 +357,7 @@ public final class Application {
File persist = new File(this.home, "conf/persistence.xml"); File persist = new File(this.home, "conf/persistence.xml");
final String homepath = this.home.getCanonicalPath(); final String homepath = this.home.getCanonicalPath();
if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath()); if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath());
logger.log(Level.INFO, RESNAME_APP_HOME + "= " + homepath + "\r\n" + RESNAME_APP_ADDR + "= " + this.localAddress.getHostAddress()); logger.log(Level.INFO, "APP_JAVA = " + System.getProperty("java.version") + "\r\n" + RESNAME_APP_ADDR + " = " + this.localAddress.getHostAddress() + "\r\n" + RESNAME_APP_HOME + " = " + homepath);
String lib = config.getValue("lib", "${APP_HOME}/libs/*").trim().replace("${APP_HOME}", homepath); String lib = config.getValue("lib", "${APP_HOME}/libs/*").trim().replace("${APP_HOME}", homepath);
lib = lib.isEmpty() ? (homepath + "/conf") : (lib + ";" + homepath + "/conf"); lib = lib.isEmpty() ? (homepath + "/conf") : (lib + ";" + homepath + "/conf");
Server.loadLib(classLoader, logger, lib); Server.loadLib(classLoader, logger, lib);
@@ -487,6 +490,15 @@ public final class Application {
} }
sncpTransportFactory.addGroupInfo(ginfo); sncpTransportFactory.addGroupInfo(ginfo);
} }
for (AnyValue conf : resources.getAnyValues("listener")) {
final String listenClass = conf.getValue("value", "");
if (listenClass.isEmpty()) continue;
Class clazz = Class.forName(listenClass);
if (!ApplicationListener.class.isAssignableFrom(clazz)) continue;
ApplicationListener listener = (ApplicationListener) clazz.newInstance();
listener.init(config);
this.listeners.add(listener);
}
} }
//------------------------------------------------------------------------ //------------------------------------------------------------------------
} }
@@ -783,6 +795,9 @@ public final class Application {
application.init(); application.init();
application.startSelfServer(); application.startSelfServer();
try { try {
for (ApplicationListener listener : application.listeners) {
listener.preStart(application);
}
application.start(); application.start();
} catch (Exception e) { } catch (Exception e) {
application.logger.log(Level.SEVERE, "Application start error", e); application.logger.log(Level.SEVERE, "Application start error", e);
@@ -801,6 +816,14 @@ public final class Application {
} }
private void shutdown() throws Exception { private void shutdown() throws Exception {
for (ApplicationListener listener : this.listeners) {
try {
listener.preShutdown(this);
} catch (Exception e) {
logger.log(Level.WARNING, listener.getClass() + " preShutdown erroneous", e);
}
}
servers.stream().forEach((server) -> { servers.stream().forEach((server) -> {
try { try {
server.shutdown(); server.shutdown();

View File

@@ -0,0 +1,45 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot;
import org.redkale.util.AnyValue;
/**
* Application启动和关闭时的监听事件 <br>
* 只能通过application.xml配置
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface ApplicationListener {
/**
* 初始化方法
*
* @param config 配置参数
*/
default void init(AnyValue config) {
}
/**
* Application 在运行start前调用
*
* @param application Application
*/
default void preStart(Application application) {
}
/**
* Application 在运行shutdown前调用
*
* @param application Application
*/
default void preShutdown(Application application) {
}
}

View File

@@ -221,7 +221,25 @@ public abstract class NodeServer {
try { try {
if (field.getAnnotation(Resource.class) == null) return; if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource
DataSource source = DataSources.createDataSource(resourceName); AnyValue sourceConf = dataResources.get(resourceName);
DataSource source = null;
boolean needinit = true;
if (sourceConf != null) {
final Class sourceType = serverClassLoader.loadClass(sourceConf.getValue("value"));
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
}
}
if (source == null) {
source = DataSources.createDataSource(resourceName); //从persistence.xml配置中创建
needinit = false;
}
application.dataSources.add(source); application.dataSources.add(source);
appResFactory.register(resourceName, DataSource.class, source); appResFactory.register(resourceName, DataSource.class, source);
@@ -242,7 +260,7 @@ public abstract class NodeServer {
field.set(src, source); field.set(src, source);
rf.inject(source, self); // 给其可能包含@Resource的字段赋值; rf.inject(source, self); // 给其可能包含@Resource的字段赋值;
//NodeServer.this.watchFactory.inject(src); //NodeServer.this.watchFactory.inject(src);
if (source instanceof Service) ((Service) source).init(null); if (source instanceof Service && needinit) ((Service) source).init(sourceConf);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e); logger.log(Level.SEVERE, "DataSource inject error", e);
} }
@@ -264,12 +282,8 @@ public abstract class NodeServer {
AnyValue sourceConf = cacheResource.get(resourceName); AnyValue sourceConf = cacheResource.get(resourceName);
if (sourceConf == null) sourceConf = dataResources.get(resourceName); if (sourceConf == null) sourceConf = dataResources.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value")); final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
Object source; Object source = null;
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource if (CacheSource.class.isAssignableFrom(sourceType)) { // CacheSource
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
application.dataSources.add((DataSource) source);
appResFactory.register(resourceName, DataSource.class, source);
} else { // CacheSource
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType(); Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;

View File

@@ -50,7 +50,7 @@ public final class CollectionDecoder<T> implements Decodeable<Reader, Collection
factory.register(type, this); factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType); this.decoder = factory.loadDecoder(this.componentType);
} else { } else {
throw new ConvertException("collectiondecoder not support the type (" + type + ")"); throw new ConvertException("CollectionDecoder not support the type (" + type + ")");
} }
} finally { } finally {
inited = true; inited = true;

View File

@@ -13,10 +13,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.*;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.*;
import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder;
import org.redkale.convert.ext.*; import org.redkale.convert.ext.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -89,17 +88,17 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
this.register(String.class, StringSimpledCoder.instance); this.register(String.class, StringSimpledCoder.instance);
this.register(CharSequence.class, CharSequenceSimpledCoder.instance); this.register(CharSequence.class, CharSequenceSimpledCoder.instance);
this.register(java.util.Date.class, DateSimpledCoder.instance); this.register(java.util.Date.class, DateSimpledCoder.instance);
this.register(AtomicInteger.class, AtomicIntegerSimpledCoder.instance);
this.register(AtomicLong.class, AtomicLongSimpledCoder.instance); this.register(AtomicLong.class, AtomicLongSimpledCoder.instance);
this.register(BigInteger.class, BigIntegerSimpledCoder.instance); this.register(BigInteger.class, BigIntegerSimpledCoder.instance);
this.register(BigDecimal.class, BigDecimalSimpledCoder.instance); this.register(BigDecimal.class, BigDecimalSimpledCoder.instance);
this.register(InetAddress.class, InetAddressSimpledCoder.instance); this.register(InetAddress.class, InetAddressSimpledCoder.instance);
this.register(DLong.class, DLongSimpledCoder.instance); this.register(DLong.class, DLongSimpledCoder.instance);
this.register(Class.class, TypeSimpledCoder.instance); this.register(Class.class, TypeSimpledCoder.instance);
this.register(InetSocketAddress.class, InetSocketAddressSimpledCoder.instance); this.register(InetSocketAddress.class, InetAddressSimpledCoder.InetSocketAddressSimpledCoder.instance);
this.register(Pattern.class, PatternSimpledCoder.instance); this.register(Pattern.class, PatternSimpledCoder.instance);
this.register(File.class, FileSimpledCoder.instance); this.register(File.class, FileSimpledCoder.instance);
this.register(CompletionHandler.class, CompletionHandlerSimpledCoder.instance); this.register(CompletionHandler.class, CompletionHandlerSimpledCoder.instance);
this.register(AsyncHandler.class, AsyncHandlerSimpledCoder.instance);
this.register(URL.class, URLSimpledCoder.instance); this.register(URL.class, URLSimpledCoder.instance);
this.register(URI.class, URISimpledCoder.instance); this.register(URI.class, URISimpledCoder.instance);
//--------------------------------------------------------- //---------------------------------------------------------
@@ -109,9 +108,12 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
this.register(short[].class, ShortArraySimpledCoder.instance); this.register(short[].class, ShortArraySimpledCoder.instance);
this.register(char[].class, CharArraySimpledCoder.instance); this.register(char[].class, CharArraySimpledCoder.instance);
this.register(int[].class, IntArraySimpledCoder.instance); this.register(int[].class, IntArraySimpledCoder.instance);
this.register(IntStream.class, IntArraySimpledCoder.IntStreamSimpledCoder.instance);
this.register(long[].class, LongArraySimpledCoder.instance); this.register(long[].class, LongArraySimpledCoder.instance);
this.register(LongStream.class, LongArraySimpledCoder.LongStreamSimpledCoder.instance);
this.register(float[].class, FloatArraySimpledCoder.instance); this.register(float[].class, FloatArraySimpledCoder.instance);
this.register(double[].class, DoubleArraySimpledCoder.instance); this.register(double[].class, DoubleArraySimpledCoder.instance);
this.register(DoubleStream.class, DoubleArraySimpledCoder.DoubleStreamSimpledCoder.instance);
this.register(String[].class, StringArraySimpledCoder.instance); this.register(String[].class, StringArraySimpledCoder.instance);
//--------------------------------------------------------- //---------------------------------------------------------
this.register(AnyValue.class, Creator.create(AnyValue.DefaultAnyValue.class)); this.register(AnyValue.class, Creator.create(AnyValue.DefaultAnyValue.class));
@@ -375,12 +377,17 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoders.put(clazz, coder); encoders.put(clazz, coder);
} }
public final <E> void register(final Type clazz, final Decodeable<R, E> decoder, final Encodeable<W, E> encoder) {
decoders.put(clazz, decoder);
encoders.put(clazz, encoder);
}
public final <E> void register(final Type clazz, final Decodeable<R, E> decoder) { public final <E> void register(final Type clazz, final Decodeable<R, E> decoder) {
decoders.put(clazz, decoder); decoders.put(clazz, decoder);
} }
public final <E> void register(final Type clazz, final Encodeable<W, E> printer) { public final <E> void register(final Type clazz, final Encodeable<W, E> encoder) {
encoders.put(clazz, printer); encoders.put(clazz, encoder);
} }
public final <E> Decodeable<R, E> findDecoder(final Type type) { public final <E> Decodeable<R, E> findDecoder(final Type type) {
@@ -461,6 +468,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
decoder = new StreamDecoder(this, type); decoder = new StreamDecoder(this, type);
} else if (Map.class.isAssignableFrom(clazz)) { } else if (Map.class.isAssignableFrom(clazz)) {
decoder = new MapDecoder(this, type); decoder = new MapDecoder(this, type);
} else if (Optional.class == clazz) {
decoder = new OptionalCoder(this, type);
} else if (clazz == Object.class) { } else if (clazz == Object.class) {
od = new ObjectDecoder(type); od = new ObjectDecoder(type);
decoder = od; decoder = od;
@@ -546,6 +555,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoder = new StreamEncoder(this, type); encoder = new StreamEncoder(this, type);
} else if (Map.class.isAssignableFrom(clazz)) { } else if (Map.class.isAssignableFrom(clazz)) {
encoder = new MapEncoder(this, type); encoder = new MapEncoder(this, type);
} else if (Optional.class == clazz) {
encoder = new OptionalCoder(this, type);
} else if (clazz == Object.class) { } else if (clazz == Object.class) {
return (Encodeable<W, E>) this.anyEncoder; return (Encodeable<W, E>) this.anyEncoder;
} else if (!clazz.getName().startsWith("java.") || java.net.HttpCookie.class == clazz) { } else if (!clazz.getName().startsWith("java.") || java.net.HttpCookie.class == clazz) {

View File

@@ -0,0 +1,107 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
/**
* Optional 的SimpledCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public class OptionalCoder<R extends Reader, W extends Writer, T> extends SimpledCoder<R, W, Optional<T>> {
private final Type type;
private final Type componentType;
protected final Class componentClass;
protected final Decodeable<Reader, T> decoder;
protected final Encodeable<Writer, T> encoder;
private boolean inited = false;
private final Object lock = new Object();
public OptionalCoder(final ConvertFactory factory, final Type type) {
this.type = type;
try {
if (type instanceof ParameterizedType) {
final ParameterizedType pt = (ParameterizedType) type;
this.componentType = pt.getActualTypeArguments()[0];
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
if (this.componentType instanceof TypeVariable) {
this.encoder = factory.getAnyEncoder();
this.componentClass = Object.class;
} else {
if (componentType instanceof ParameterizedType) {
final ParameterizedType pt2 = (ParameterizedType) componentType;
this.componentClass = (Class) pt2.getRawType();
} else {
this.componentClass = (Class) componentType;
}
this.encoder = factory.loadEncoder(this.componentType);
}
} else {
this.componentType = Object.class;
this.componentClass = Object.class;
this.decoder = factory.loadDecoder(this.componentType);
this.encoder = factory.getAnyEncoder();
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
}
}
}
@Override
public void convertTo(W out, Optional<T> value) {
if (value == null || !value.isPresent()) {
out.writeObjectNull(null);
return;
}
if (this.encoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
this.encoder.convertTo(out, value.get());
}
@Override
public Optional<T> convertFrom(R in) {
if (this.decoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return Optional.ofNullable(this.decoder.convertFrom(in));
}
}

View File

@@ -73,7 +73,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
} }
public void offerBsonReader(final BsonReader in) { public void offerBsonReader(final BsonReader in) {
if (in != null) readerPool.offer(in); if (in != null) readerPool.accept(in);
} }
//------------------------------ writer ----------------------------------------------------------- //------------------------------ writer -----------------------------------------------------------
@@ -90,7 +90,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
} }
public void offerBsonWriter(final BsonWriter out) { public void offerBsonWriter(final BsonWriter out) {
if (out != null) writerPool.offer(out); if (out != null) writerPool.accept(out);
} }
//------------------------------ convertFrom ----------------------------------------------------------- //------------------------------ convertFrom -----------------------------------------------------------
@@ -106,7 +106,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
in.setBytes(bytes, start, len); in.setBytes(bytes, start, len);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T rs = (T) factory.loadDecoder(type).convertFrom(in); T rs = (T) factory.loadDecoder(type).convertFrom(in);
readerPool.offer(in); readerPool.accept(in);
return rs; return rs;
} }
@@ -145,7 +145,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny); final BsonWriter out = writerPool.get().tiny(tiny);
out.writeNull(); out.writeNull();
byte[] result = out.toArray(); byte[] result = out.toArray();
writerPool.offer(out); writerPool.accept(out);
return result; return result;
} }
return convertTo(value.getClass(), value); return convertTo(value.getClass(), value);
@@ -157,7 +157,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny); final BsonWriter out = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(out, value); factory.loadEncoder(type).convertTo(out, value);
byte[] result = out.toArray(); byte[] result = out.toArray();
writerPool.offer(out); writerPool.accept(out);
return result; return result;
} }
@@ -167,7 +167,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny); final BsonWriter out = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values); ((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
byte[] result = out.toArray(); byte[] result = out.toArray();
writerPool.offer(out); writerPool.accept(out);
return result; return result;
} }

View File

@@ -1,36 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert.ext;
import org.redkale.convert.*;
import org.redkale.util.AsyncHandler;
/**
* AsyncHandlerSimpledCoder 的SimpledCoder实现, 只输出null
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public final class AsyncHandlerSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, AsyncHandler> {
public static final AsyncHandlerSimpledCoder instance = new AsyncHandlerSimpledCoder();
@Override
public void convertTo(W out, AsyncHandler value) {
out.writeObjectNull(AsyncHandler.class);
}
@Override
public AsyncHandler convertFrom(R in) {
in.readObjectB(AsyncHandler.class);
return null;
}
}

View File

@@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert.ext;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.convert.*;
/**
* AtomicInteger 的SimpledCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public class AtomicIntegerSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, AtomicInteger> {
public static final AtomicIntegerSimpledCoder instance = new AtomicIntegerSimpledCoder();
@Override
public void convertTo(W out, AtomicInteger value) {
out.writeInt(value == null ? 0 : value.get());
}
@Override
public AtomicInteger convertFrom(R in) {
return new AtomicInteger(in.readInt());
}
}

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.convert.ext; package org.redkale.convert.ext;
import java.util.stream.DoubleStream;
import org.redkale.convert.Reader; import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder; import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer; import org.redkale.convert.Writer;
@@ -12,7 +13,9 @@ import org.redkale.convert.Writer;
/** /**
* double[] 的SimpledCoder实现 * double[] 的SimpledCoder实现
* *
* <p> 详情见: https://redkale.org * <p>
* 详情见: https://redkale.org
*
* @author zhangjx * @author zhangjx
* @param <R> Reader输入的子类型 * @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型 * @param <W> Writer输出的子类型
@@ -66,4 +69,24 @@ public final class DoubleArraySimpledCoder<R extends Reader, W extends Writer> e
} }
} }
public final static class DoubleStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, DoubleStream> {
public static final DoubleStreamSimpledCoder instance = new DoubleStreamSimpledCoder();
@Override
public void convertTo(W out, DoubleStream values) {
if (values == null) {
out.writeNull();
return;
}
DoubleArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public DoubleStream convertFrom(R in) {
double[] value = DoubleArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : DoubleStream.of(value);
}
}
} }

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.convert.ext; package org.redkale.convert.ext;
import java.util.stream.IntStream;
import org.redkale.convert.Reader; import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder; import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer; import org.redkale.convert.Writer;
@@ -12,7 +13,9 @@ import org.redkale.convert.Writer;
/** /**
* int[] 的SimpledCoder实现 * int[] 的SimpledCoder实现
* *
* <p> 详情见: https://redkale.org * <p>
* 详情见: https://redkale.org
*
* @author zhangjx * @author zhangjx
* @param <R> Reader输入的子类型 * @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型 * @param <W> Writer输出的子类型
@@ -66,4 +69,24 @@ public final class IntArraySimpledCoder<R extends Reader, W extends Writer> exte
} }
} }
public final static class IntStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, IntStream> {
public static final IntStreamSimpledCoder instance = new IntStreamSimpledCoder();
@Override
public void convertTo(W out, IntStream values) {
if (values == null) {
out.writeNull();
return;
}
IntArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public IntStream convertFrom(R in) {
int[] value = IntArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : IntStream.of(value);
}
}
} }

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.convert.ext; package org.redkale.convert.ext;
import java.util.stream.LongStream;
import org.redkale.convert.Reader; import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder; import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer; import org.redkale.convert.Writer;
@@ -68,4 +69,24 @@ public final class LongArraySimpledCoder<R extends Reader, W extends Writer> ext
} }
} }
public final static class LongStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, LongStream> {
public static final LongStreamSimpledCoder instance = new LongStreamSimpledCoder();
@Override
public void convertTo(W out, LongStream values) {
if (values == null) {
out.writeNull();
return;
}
LongArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public LongStream convertFrom(R in) {
long[] value = LongArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : LongStream.of(value);
}
}
} }

View File

@@ -23,7 +23,7 @@ import org.redkale.util.*;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> { public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.LinkedHashMap<String, String>>() { public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.HashMap<String, String>>() {
}.getType(); }.getType();
private static final ObjectPool<JsonReader> readerPool = JsonReader.createPool(Integer.getInteger("convert.json.pool.size", 16)); private static final ObjectPool<JsonReader> readerPool = JsonReader.createPool(Integer.getInteger("convert.json.pool.size", 16));
@@ -60,7 +60,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
} }
public void offerJsonReader(final JsonReader in) { public void offerJsonReader(final JsonReader in) {
if (in != null) readerPool.offer(in); if (in != null) readerPool.accept(in);
} }
//------------------------------ writer ----------------------------------------------------------- //------------------------------ writer -----------------------------------------------------------
@@ -81,7 +81,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
} }
public void offerJsonWriter(final JsonWriter out) { public void offerJsonWriter(final JsonWriter out) {
if (out != null) writerPool.offer(out); if (out != null) writerPool.accept(out);
} }
//------------------------------ convertFrom ----------------------------------------------------------- //------------------------------ convertFrom -----------------------------------------------------------
@@ -100,7 +100,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonReader in = readerPool.get(); final JsonReader in = readerPool.get();
in.setText(text, start, len); in.setText(text, start, len);
T rs = (T) factory.loadDecoder(type).convertFrom(in); T rs = (T) factory.loadDecoder(type).convertFrom(in);
readerPool.offer(in); readerPool.accept(in);
return rs; return rs;
} }
@@ -142,7 +142,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonWriter out = writerPool.get().tiny(tiny); final JsonWriter out = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(out, value); factory.loadEncoder(type).convertTo(out, value);
String result = out.toString(); String result = out.toString();
writerPool.offer(out); writerPool.accept(out);
return result; return result;
} }
@@ -152,7 +152,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonWriter out = writerPool.get().tiny(tiny); final JsonWriter out = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values); ((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
String result = out.toString(); String result = out.toString();
writerPool.offer(out); writerPool.accept(out);
return result; return result;
} }
@@ -160,7 +160,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (value == null) { if (value == null) {
new JsonStreamWriter(tiny, out).writeNull(); new JsonStreamWriter(tiny, out).writeNull();
} else { } else {
factory.loadEncoder(value.getClass()).convertTo(new JsonStreamWriter(tiny, out), value); convertTo(out, value.getClass(), value);
} }
} }
@@ -169,7 +169,15 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (value == null) { if (value == null) {
new JsonStreamWriter(tiny, out).writeNull(); new JsonStreamWriter(tiny, out).writeNull();
} else { } else {
factory.loadEncoder(type).convertTo(new JsonStreamWriter(tiny, out), value); final JsonWriter writer = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(writer, value);
byte[] bs = writer.toBytes();
writerPool.accept(writer);
try {
out.write(bs);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }
@@ -177,7 +185,15 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (values == null) { if (values == null) {
new JsonStreamWriter(tiny, out).writeNull(); new JsonStreamWriter(tiny, out).writeNull();
} else { } else {
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(new JsonStreamWriter(tiny, out), values); final JsonWriter writer = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(writer, values);
byte[] bs = writer.toBytes();
writerPool.accept(writer);
try {
out.write(bs);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@@ -59,6 +59,7 @@ public class JsonWriter extends Writer {
* 返回指定至少指定长度的缓冲区 * 返回指定至少指定长度的缓冲区
* *
* @param len * @param len
*
* @return * @return
*/ */
private char[] expand(int len) { private char[] expand(int len) {
@@ -108,6 +109,10 @@ public class JsonWriter extends Writer {
return new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(content, 0, count))}; return new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(content, 0, count))};
} }
public byte[] toBytes() {
return Utility.encodeUTF8(content, 0, count);
}
public int count() { public int count() {
return this.count; return this.count;
} }

View File

@@ -130,13 +130,13 @@ public class Context {
} }
public void offerBuffer(ByteBuffer buffer) { public void offerBuffer(ByteBuffer buffer) {
bufferPool.offer(buffer); bufferPool.accept(buffer);
} }
public void offerBuffer(ByteBuffer... buffers) { public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return; if (buffers == null) return;
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
bufferPool.offer(buffer); bufferPool.accept(buffer);
} }
} }

View File

@@ -195,7 +195,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
if (kill) refuseAlive(); if (kill) refuseAlive();
this.context.responsePool.offer(this); this.context.responsePool.accept(this);
} }
public void finish(final byte[] bs) { public void finish(final byte[] bs) {

View File

@@ -178,7 +178,7 @@ public final class Transport {
} }
public void offerBuffer(ByteBuffer buffer) { public void offerBuffer(ByteBuffer buffer) {
bufferPool.offer(buffer); bufferPool.accept(buffer);
} }
public void offerBuffer(ByteBuffer... buffers) { public void offerBuffer(ByteBuffer... buffers) {

View File

@@ -308,7 +308,7 @@ public class TransportFactory {
@Override @Override
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
if (counter > 3) { if (counter > 3) {
bufferPool.offer(attachment); bufferPool.accept(attachment);
localconn.dispose(); localconn.dispose();
return; return;
} }
@@ -317,7 +317,7 @@ public class TransportFactory {
localconn.read(pongBuffer, pongBuffer, this); localconn.read(pongBuffer, pongBuffer, this);
return; return;
} }
bufferPool.offer(attachment); bufferPool.accept(attachment);
localqueue.offer(localconn); localqueue.offer(localconn);
} }

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.nio.charset.*; import java.nio.charset.*;
import java.security.*; import java.security.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -54,7 +55,7 @@ public class HttpContext extends Context {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <H extends AsyncHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) { protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass); Creator<H> creator = asyncHandlerCreators.get(handlerClass);
if (creator == null) { if (creator == null) {
creator = createAsyncHandlerCreator(handlerClass); creator = createAsyncHandlerCreator(handlerClass);
@@ -64,14 +65,14 @@ public class HttpContext extends Context {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <H extends AsyncHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) { private <H extends CompletionHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
//生成规则与SncpAsyncHandler.Factory 很类似 //生成规则与SncpAsyncHandler.Factory 很类似
//------------------------------------------------------------- //-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface(); final boolean handlerinterface = handlerClass.isInterface();
final String handlerClassName = handlerClass.getName().replace('.', '/'); final String handlerClassName = handlerClass.getName().replace('.', '/');
final String handlerName = AsyncHandler.class.getName().replace('.', '/'); final String handlerName = CompletionHandler.class.getName().replace('.', '/');
final String handlerDesc = Type.getDescriptor(AsyncHandler.class); final String handlerDesc = Type.getDescriptor(CompletionHandler.class);
final String newDynName = handlerClass.getName().replace('.', '/') + "_Dync" + AsyncHandler.class.getSimpleName() + "_" + (System.currentTimeMillis() % 10000); final String newDynName = handlerClass.getName().replace('.', '/') + "_DyncAsyncHandler_" + (System.currentTimeMillis() % 10000);
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES); ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
FieldVisitor fv; FieldVisitor fv;
@@ -157,7 +158,7 @@ public class HttpContext extends Context {
} }
cw.visitEnd(); cw.visitEnd();
byte[] bytes = cw.toByteArray(); byte[] bytes = cw.toByteArray();
Class<AsyncHandler> newHandlerClazz = (Class<AsyncHandler>) new ClassLoader(handlerClass.getClassLoader()) { Class<CompletionHandler> newHandlerClazz = (Class<CompletionHandler>) new ClassLoader(handlerClass.getClassLoader()) {
public final Class<?> loadClass(String name, byte[] b) { public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length); return defineClass(name, b, 0, b.length);
} }

View File

@@ -519,8 +519,8 @@ public class HttpRequest extends Request<HttpContext> {
* @return cookie值 * @return cookie值
*/ */
public String getCookie(String name, String dfvalue) { public String getCookie(String name, String dfvalue) {
for (HttpCookie cookie : getCookies()) { for (HttpCookie c : getCookies()) {
if (name.equals(cookie.getName())) return cookie.getValue(); if (name.equals(c.getName())) return c.getValue();
} }
return dfvalue; return dfvalue;
} }
@@ -1134,6 +1134,38 @@ public class HttpRequest extends Request<HttpContext> {
return map0; return map0;
} }
/**
* 将请求参数转换成String, 字符串格式为: bean1={}&amp;id=13&amp;name=xxx <br>
* 不会返回null没有参数返回空字符串
*
*
* @return String
*/
public String getParametersToString() {
return getParametersToString(null);
}
/**
* 将请求参数转换成String, 字符串格式为: bean1={}&amp;id=13&amp;name=xxx <br>
* 不会返回null没有参数返回空字符串
*
* @param prefix 拼接前缀, 如果无参数,返回的字符串不会含有拼接前缀
*
* @return String
*/
public String getParametersToString(String prefix) {
final StringBuilder sb = new StringBuilder();
getParameters().forEach((k, v) -> {
if (sb.length() > 0) sb.append('&');
try {
sb.append(k).append('=').append(URLEncoder.encode(v, "UTF-8"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
return (sb.length() > 0 && prefix != null) ? (prefix + sb) : sb.toString();
}
/** /**
* 获取所有参数名 * 获取所有参数名
* *

View File

@@ -16,6 +16,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.AnyValue.DefaultAnyValue;
@@ -147,6 +148,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return super.removeChannel(); return super.removeChannel();
} }
protected AsyncConnection getChannel() {
return channel;
}
@Override @Override
protected boolean recycle() { protected boolean recycle() {
boolean rs = super.recycle(); boolean rs = super.recycle();
@@ -220,38 +225,32 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} }
/** /**
* 创建AsyncHandler实例 * 创建CompletionHandler实例
* *
* @return AsyncHandler * @return CompletionHandler
*/ */
public AsyncHandler createAsyncHandler() { public CompletionHandler createAsyncHandler() {
return AsyncHandler.create((v, a) -> { return Utility.createAsyncHandler((v, a) -> {
if (v instanceof org.redkale.service.RetResult) { finish(v);
finishJson((org.redkale.service.RetResult) v);
} else if (v instanceof CharSequence) {
finish(String.valueOf(v));
} else {
finishJson(v);
}
}, (t, a) -> { }, (t, a) -> {
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, t); request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletionHandler", (Throwable) t);
finish(500, null); finish(500, null);
}); });
} }
/** /**
* 创建AsyncHandler子类的实例 <br> * 创建CompletionHandler子类的实例 <br>
* *
* 传入的AsyncHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。 * 传入的CompletionHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
* *
* @param <H> 泛型 * @param <H> 泛型
* @param handlerClass AsyncHandler子类 * @param handlerClass CompletionHandler子类
* *
* @return AsyncHandler AsyncHandler * @return CompletionHandler
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass) { public <H extends CompletionHandler> H createAsyncHandler(Class<H> handlerClass) {
if (handlerClass == null || handlerClass == AsyncHandler.class) return (H) createAsyncHandler(); if (handlerClass == null || handlerClass == CompletionHandler.class) return (H) createAsyncHandler();
return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler()); return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler());
} }
@@ -376,7 +375,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param future 输出对象的句柄 * @param future 输出对象的句柄
*/ */
public void finishJson(final CompletableFuture future) { public void finishJson(final CompletableFuture future) {
finishJson(request.getJsonConvert(), future); finish(request.getJsonConvert(), (Type) null, future);
} }
/** /**
@@ -387,20 +386,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final CompletableFuture future) { public void finishJson(final JsonConvert convert, final CompletableFuture future) {
future.whenComplete((v, e) -> { finish(convert, (Type) null, future);
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
return;
}
if (v instanceof CharSequence) {
finish(v.toString());
} else if (v instanceof org.redkale.service.RetResult) {
finishJson(convert, (org.redkale.service.RetResult) v);
} else {
finishJson(convert, v);
}
});
} }
/** /**
@@ -412,60 +398,86 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future) { public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future) {
future.whenComplete((v, e) -> { finish(convert, type, future);
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
return;
}
if (v instanceof CharSequence) {
finish(v.toString());
} else if (v instanceof HttpResult) {
finishJson(convert, (HttpResult) v);
} else if (v instanceof org.redkale.service.RetResult) {
finishJson(convert, (org.redkale.service.RetResult) v);
} else {
finishJson(convert, type, v);
}
});
} }
/** /**
* 将HttpResult的结果对象以JSON格式输出 * 将结果对象输出
* *
* @param result HttpResult对象 * @param obj 输出对象
*/
public void finishJson(final HttpResult result) {
finishJson(request.getJsonConvert(), result);
}
/**
* 将HttpResult的结果对象以JSON格式输出
*
* @param convert 指定的JsonConvert
* @param result HttpResult对象
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final HttpResult result) { public void finish(final Object obj) {
if (output == null) { finish(request.getJsonConvert(), (Type) null, obj);
finish(""); }
/**
* 将结果对象输出
*
* @param convert 指定的Convert
* @param obj 输出对象
*/
@SuppressWarnings("unchecked")
public void finish(final Convert convert, final Object obj) {
finish(convert, (Type) null, obj);
}
/**
* 将结果对象输出
*
* @param convert 指定的Convert
* @param type 指定的类型
* @param obj 输出对象
*/
@SuppressWarnings("unchecked")
public void finish(final Convert convert, final Type type, final Object obj) {
if (obj == null) {
finish("null");
} else if (obj instanceof CompletableFuture) {
((CompletableFuture) obj).whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e);
finish(500, null);
return; return;
} }
if (result.getContentType() != null) setContentType(result.getContentType()); finish(convert, type, v);
addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus()); });
if (result.getResult() instanceof File) { } else if (obj instanceof CharSequence) {
finish((String) obj.toString());
} else if (obj instanceof byte[]) {
finish((byte[]) obj);
} else if (obj instanceof ByteBuffer) {
finish((ByteBuffer) obj);
} else if (obj instanceof ByteBuffer[]) {
finish((ByteBuffer[]) obj);
} else if (obj instanceof File) {
try { try {
finish((File) result.getResult()); finish((File) obj);
} catch (IOException e) { } catch (IOException e) {
getContext().getLogger().log(Level.WARNING, "HttpServlet finishJson HttpResult File occur, forece to close channel. request = " + getRequest(), e); getContext().getLogger().log(Level.WARNING, "HttpServlet finish File occur, forece to close channel. request = " + getRequest(), e);
finish(500, null); finish(500, null);
} }
} else if (result.getResult() instanceof String) { } else if (obj instanceof HttpResult) {
finish((String) result.getResult()); HttpResult result = (HttpResult) obj;
} else if (result.getResult() == null) { if (result.getContentType() != null) setContentType(result.getContentType());
finish(result.getMessage()); addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus());
if (result.getResult() == null) {
finish("");
} else { } else {
finishJson(result.getResult()); finish(convert, result.getResult());
}
} else {
if (convert instanceof TextConvert) this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
if (obj instanceof org.redkale.service.RetResult) {
org.redkale.service.RetResult ret = (org.redkale.service.RetResult) obj;
if (!ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode())).addValue("retinfo", ret.getRetinfo());
}
}
ByteBuffer[] buffers = type == null ? convert.convertTo(context.getBufferSupplier(), obj)
: convert.convertTo(context.getBufferSupplier(), type, obj);
finish(buffers);
} }
} }
@@ -642,7 +654,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param attachment 异步回调参数 * @param attachment 异步回调参数
* @param handler 异步回调函数 * @param handler 异步回调函数
*/ */
public <A> void sendBody(ByteBuffer buffer, A attachment, AsyncHandler<Integer, A> handler) { public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) { if (!this.headsended) {
if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining(); if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader(); ByteBuffer headbuf = createHeader();
@@ -665,7 +677,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param attachment 异步回调参数 * @param attachment 异步回调参数
* @param handler 异步回调函数 * @param handler 异步回调函数
*/ */
public <A> void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler<Integer, A> handler) { public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) { if (!this.headsended) {
if (this.contentLength < 0) { if (this.contentLength < 0) {
int len = 0; int len = 0;
@@ -1016,7 +1028,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.bufferHandler = bufferHandler; this.bufferHandler = bufferHandler;
} }
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> { protected final class TransferFileHandler implements CompletionHandler<Integer, ByteBuffer> {
private final File file; private final File file;

View File

@@ -76,6 +76,7 @@ public class MimeType {
contentTypes.put("m3u", "audio/x-mpegurl"); contentTypes.put("m3u", "audio/x-mpegurl");
contentTypes.put("mac", "image/x-macpaint"); contentTypes.put("mac", "image/x-macpaint");
contentTypes.put("man", "application/x-troff-man"); contentTypes.put("man", "application/x-troff-man");
contentTypes.put("manifest", "text/cache-manifest");
contentTypes.put("mathml", "application/mathml+xml"); contentTypes.put("mathml", "application/mathml+xml");
contentTypes.put("me", "application/x-troff-me"); contentTypes.put("me", "application/x-troff-me");
contentTypes.put("mid", "audio/x-midi"); contentTypes.put("mid", "audio/x-midi");

View File

@@ -10,6 +10,7 @@ import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*; import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -17,6 +18,7 @@ import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import static jdk.internal.org.objectweb.asm.Opcodes.*; import static jdk.internal.org.objectweb.asm.Opcodes.*;
import jdk.internal.org.objectweb.asm.Type; import jdk.internal.org.objectweb.asm.Type;
import org.redkale.convert.Convert;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -141,13 +143,20 @@ public final class Rest {
return childFactory.getConvert(); return childFactory.getConvert();
} }
static String getWebModuleName(Class<? extends Service> serviceType) { static String getWebModuleNameLowerCase(Class<? extends Service> serviceType) {
final RestService controller = serviceType.getAnnotation(RestService.class); final RestService controller = serviceType.getAnnotation(RestService.class);
if (controller == null) return serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase(); if (controller == null) return serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase();
if (controller.ignore()) return null; if (controller.ignore()) return null;
return (!controller.name().isEmpty()) ? controller.name() : serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase(); return (!controller.name().isEmpty()) ? controller.name() : serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase();
} }
static String getWebModuleName(Class<? extends Service> serviceType) {
final RestService controller = serviceType.getAnnotation(RestService.class);
if (controller == null) return serviceType.getSimpleName().replaceAll("Service.*$", "");
if (controller.ignore()) return null;
return (!controller.name().isEmpty()) ? controller.name() : serviceType.getSimpleName().replaceAll("Service.*$", "");
}
static boolean isRestDyn(HttpServlet servlet) { static boolean isRestDyn(HttpServlet servlet) {
return servlet.getClass().getAnnotation(RestDyn.class) != null; return servlet.getClass().getAnnotation(RestDyn.class) != null;
} }
@@ -328,6 +337,10 @@ public final class Rest {
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.anyuser() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "anyuser", "Z");
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
mv.visitMaxs(3, 1); mv.visitMaxs(3, 1);
mv.visitEnd(); mv.visitEnd();
@@ -574,7 +587,9 @@ public final class Rest {
final String webServletDesc = Type.getDescriptor(WebServlet.class); final String webServletDesc = Type.getDescriptor(WebServlet.class);
final String reqDesc = Type.getDescriptor(HttpRequest.class); final String reqDesc = Type.getDescriptor(HttpRequest.class);
final String respDesc = Type.getDescriptor(HttpResponse.class); final String respDesc = Type.getDescriptor(HttpResponse.class);
final String convertDesc = Type.getDescriptor(JsonConvert.class); final String convertDesc = Type.getDescriptor(Convert.class);
final String typeDesc = Type.getDescriptor(java.lang.reflect.Type.class);
final String jsonConvertDesc = Type.getDescriptor(JsonConvert.class);
final String retDesc = Type.getDescriptor(RetResult.class); final String retDesc = Type.getDescriptor(RetResult.class);
final String futureDesc = Type.getDescriptor(CompletableFuture.class); final String futureDesc = Type.getDescriptor(CompletableFuture.class);
final String flipperDesc = Type.getDescriptor(Flipper.class); final String flipperDesc = Type.getDescriptor(Flipper.class);
@@ -601,7 +616,8 @@ public final class Rest {
String newDynName = serviceTypeInternalName.substring(0, serviceTypeInternalName.lastIndexOf('/') + 1) + "_Dyn" + serviceType.getSimpleName().replaceAll("Service.*$", "") + "RestServlet"; String newDynName = serviceTypeInternalName.substring(0, serviceTypeInternalName.lastIndexOf('/') + 1) + "_Dyn" + serviceType.getSimpleName().replaceAll("Service.*$", "") + "RestServlet";
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
final String defmodulename = getWebModuleName(serviceType); final String defmodulename = getWebModuleNameLowerCase(serviceType);
final String bigmodulename = getWebModuleName(serviceType);
final String catalog = controller == null ? "" : controller.catalog(); final String catalog = controller == null ? "" : controller.catalog();
if (!checkName(catalog)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9"); if (!checkName(catalog)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(defmodulename)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".value, only 0-9 a-z A-Z _ cannot begin 0-9"); if (!checkName(defmodulename)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".value, only 0-9 a-z A-Z _ cannot begin 0-9");
@@ -704,7 +720,7 @@ public final class Rest {
if (ignore) continue; if (ignore) continue;
paramtypes.add(method.getGenericParameterTypes()); paramtypes.add(method.getGenericParameterTypes());
if (mappings.length == 0) { //没有Mapping设置一个默认值 if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(methodidex, null, defmodulename, method); MappingEntry entry = new MappingEntry(methodidex, null, bigmodulename, method);
if (entrys.contains(entry)) throw new RuntimeException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat"); if (entrys.contains(entry)) throw new RuntimeException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat");
entrys.add(entry); entrys.add(entry);
} else { } else {
@@ -717,7 +733,6 @@ public final class Rest {
methodidex++; methodidex++;
} }
if (entrys.isEmpty()) return null; //没有可HttpMapping的方法 if (entrys.isEmpty()) return null; //没有可HttpMapping的方法
//将每个Service可转换的方法生成HttpServlet对应的HttpMapping方法 //将每个Service可转换的方法生成HttpServlet对应的HttpMapping方法
final Map<String, List<String>> asmParamMap = MethodParamClassVisitor.getMethodParamNames(serviceType); final Map<String, List<String>> asmParamMap = MethodParamClassVisitor.getMethodParamNames(serviceType);
final Map<String, java.lang.reflect.Type> bodyTypes = new HashMap<>(); final Map<String, java.lang.reflect.Type> bodyTypes = new HashMap<>();
@@ -864,7 +879,7 @@ public final class Rest {
if (ptype.isPrimitive() || ptype == String.class) n = "#"; if (ptype.isPrimitive() || ptype == String.class) n = "#";
} }
if (annhead == null && anncookie == null && annsid == null && annaddr == null && annbody == null && annfile == null if (annhead == null && anncookie == null && annsid == null && annaddr == null && annbody == null && annfile == null
&& !ptype.isPrimitive() && ptype != String.class && ptype != Flipper.class && !AsyncHandler.class.isAssignableFrom(ptype) && !ptype.isPrimitive() && ptype != String.class && ptype != Flipper.class && !CompletionHandler.class.isAssignableFrom(ptype)
&& !ptype.getName().startsWith("java") && n.charAt(0) != '#' && !"&".equals(n)) { //判断Json对象是否包含@RestUploadFile && !ptype.getName().startsWith("java") && n.charAt(0) != '#' && !"&".equals(n)) { //判断Json对象是否包含@RestUploadFile
Class loop = ptype; Class loop = ptype;
do { do {
@@ -1001,17 +1016,17 @@ public final class Rest {
paramMap.put("name", pname); paramMap.put("name", pname);
paramMap.put("type", ptype.getName()); paramMap.put("type", ptype.getName());
if (AsyncHandler.class.isAssignableFrom(ptype)) { //HttpResponse.createAsyncHandler() or HttpResponse.createAsyncHandler(Class) if (CompletionHandler.class.isAssignableFrom(ptype)) { //HttpResponse.createAsyncHandler() or HttpResponse.createAsyncHandler(Class)
if (ptype == AsyncHandler.class) { if (ptype == CompletionHandler.class) {
mv.visitVarInsn(ALOAD, 2); mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "()Lorg/redkale/util/AsyncHandler;", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "()Ljava/nio/channels/CompletionHandler;", false);
mv.visitVarInsn(ASTORE, maxLocals); mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals}); varInsns.add(new int[]{ALOAD, maxLocals});
} else { } else {
mv.visitVarInsn(ALOAD, 3); mv.visitVarInsn(ALOAD, 3);
mv.visitVarInsn(ALOAD, 2); mv.visitVarInsn(ALOAD, 2);
mv.visitLdcInsn(Type.getType(Type.getDescriptor(ptype))); mv.visitLdcInsn(Type.getType(Type.getDescriptor(ptype)));
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "(Ljava/lang/Class;)Lorg/redkale/util/AsyncHandler;", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "(Ljava/lang/Class;)Ljava/nio/channels/CompletionHandler;", false);
mv.visitTypeInsn(CHECKCAST, ptype.getName().replace('.', '/')); mv.visitTypeInsn(CHECKCAST, ptype.getName().replace('.', '/'));
mv.visitVarInsn(ASTORE, maxLocals); mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals}); varInsns.add(new int[]{ALOAD, maxLocals});
@@ -1532,34 +1547,6 @@ public final class Rest {
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/io/File;)V", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/io/File;)V", false);
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
maxLocals++; maxLocals++;
} else if (RetResult.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + retDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + retDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else if (HttpResult.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + httprsDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + httprsDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else if (Number.class.isAssignableFrom(returnType) || CharSequence.class.isAssignableFrom(returnType)) { //returnType == String.class 必须放在前面 } else if (Number.class.isAssignableFrom(returnType) || CharSequence.class.isAssignableFrom(returnType)) { //returnType == String.class 必须放在前面
mv.visitVarInsn(ASTORE, maxLocals); mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response mv.visitVarInsn(ALOAD, 2); //response
@@ -1568,31 +1555,17 @@ public final class Rest {
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/lang/String;)V", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/lang/String;)V", false);
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
maxLocals++; maxLocals++;
} else if (CompletableFuture.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2);//response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + futureDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + futureDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else { } else {
mv.visitVarInsn(ASTORE, maxLocals); mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) { if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc); mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), jsonConvertDesc);
mv.visitVarInsn(ALOAD, maxLocals); mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + "Ljava/lang/Object;)V", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(" + convertDesc + "Ljava/lang/Object;)V", false);
} else { } else {
mv.visitVarInsn(ALOAD, maxLocals); mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(Ljava/lang/Object;)V", false); mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/lang/Object;)V", false);
} }
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
maxLocals++; maxLocals++;
@@ -1613,7 +1586,7 @@ public final class Rest {
} }
for (int i = 1; i <= restConverts.size(); i++) { for (int i = 1; i <= restConverts.size(); i++) {
fv = cw.visitField(ACC_PRIVATE, REST_JSONCONVERT_FIELD_PREFIX + i, convertDesc, null, null); fv = cw.visitField(ACC_PRIVATE, REST_JSONCONVERT_FIELD_PREFIX + i, jsonConvertDesc, null, null);
fv.visitEnd(); fv.visitEnd();
} }

View File

@@ -52,6 +52,13 @@ public @interface RestWebSocket {
*/ */
boolean single() default true; boolean single() default true;
/**
* WebSocket.createUserid返回的值是否不能表示户登录态
*
* @return 默认false
*/
boolean anyuser() default false;
/** /**
* WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒 * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒
* *

View File

@@ -64,6 +64,9 @@ public abstract class WebSocket<G extends Serializable, T> {
@Comment("WebSocket已离线") @Comment("WebSocket已离线")
public static final int RETCODE_WSOFFLINE = 1 << 8; //256 public static final int RETCODE_WSOFFLINE = 1 << 8; //256
@Comment("WebSocket将延迟发送")
public static final int RETCODE_DEAYSEND = 1 << 9; //512
WebSocketRunner _runner; //不可能为空 WebSocketRunner _runner; //不可能为空
WebSocketEngine _engine; //不可能为空 WebSocketEngine _engine; //不可能为空
@@ -90,6 +93,8 @@ public abstract class WebSocket<G extends Serializable, T> {
private Map<String, Object> attributes = new HashMap<>(); //非线程安全 private Map<String, Object> attributes = new HashMap<>(); //非线程安全
List<WebSocketPacket> delayPackets;
protected WebSocket() { protected WebSocket() {
} }
@@ -225,6 +230,11 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 0表示成功 非0表示错误码 * @return 0表示成功 非0表示错误码
*/ */
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) { CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._runner == null) {
if (delayPackets == null) delayPackets = new ArrayList<>();
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
}
CompletableFuture<Integer> rs = this._runner.sendMessage(packet); CompletableFuture<Integer> rs = this._runner.sendMessage(packet);
if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
_engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); _engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);
@@ -429,6 +439,18 @@ public abstract class WebSocket<G extends Serializable, T> {
return _engine.node.getRpcNodeWebSocketAddresses(userid); return _engine.node.getRpcNodeWebSocketAddresses(userid);
} }
/**
* 更改本WebSocket的userid
*
* @param newuserid 新用户ID不能为null
*
* @return CompletableFuture
*/
public CompletableFuture<Void> changeUserid(final G newuserid) {
if (newuserid == null) throw new NullPointerException("newuserid is null");
return _engine.changeUserid(this, newuserid);
}
/** /**
* 强制关闭用户的所有WebSocket * 强制关闭用户的所有WebSocket
* *

View File

@@ -10,7 +10,7 @@ import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.Predicate; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.*; import java.util.stream.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
@@ -149,6 +149,31 @@ public class WebSocketEngine {
} }
} }
@Comment("更改WebSocket的userid")
CompletableFuture<Void> changeUserid(WebSocket socket, final Serializable newuserid) {
if (newuserid == null) throw new NullPointerException("newuserid is null");
final Serializable olduserid = socket._userid;
socket._userid = newuserid;
if (single) {
websockets.remove(olduserid);
websockets.put(newuserid, socket);
} else { //非线程安全, 在常规场景中无需锁
List<WebSocket> oldlist = websockets2.get(olduserid);
if (oldlist != null) {
oldlist.remove(socket);
if (oldlist.isEmpty()) websockets2.remove(olduserid);
}
List<WebSocket> newlist = websockets2.get(newuserid);
if (newlist == null) {
newlist = new CopyOnWriteArrayList<>();
websockets2.put(newuserid, newlist);
}
newlist.add(socket);
}
if (node != null) return node.changeUserid(olduserid, newuserid);
return CompletableFuture.completedFuture(null);
}
@Comment("强制关闭本地用户的WebSocket") @Comment("强制关闭本地用户的WebSocket")
public int forceCloseLocalWebSocket(Serializable userid) { public int forceCloseLocalWebSocket(Serializable userid) {
if (single) { if (single) {
@@ -298,6 +323,16 @@ public class WebSocketEngine {
return list; return list;
} }
@Comment("获取所有连接")
public void forEachLocalWebSocket(Consumer<WebSocket> consumer) {
if (consumer == null) return;
if (single) {
websockets.values().stream().forEach(consumer);
} else {
websockets2.values().forEach(x -> x.stream().forEach(consumer));
}
}
@Comment("获取当前连接总数") @Comment("获取当前连接总数")
public int getLocalWebSocketSize() { public int getLocalWebSocketSize() {
if (single) return websockets.size(); if (single) return websockets.size();

View File

@@ -86,6 +86,8 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr); protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress addr);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress addr); protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress addr);
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@@ -99,6 +101,11 @@ public abstract class WebSocketNode {
return disconnect(userid, localSncpAddress); return disconnect(userid, localSncpAddress);
} }
final CompletableFuture<Void> changeUserid(Serializable olduserid, final Serializable newuserid) {
if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return changeUserid(olduserid, newuserid, localSncpAddress);
}
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
/** /**
* 获取目标地址 <br> * 获取目标地址 <br>
@@ -422,7 +429,7 @@ public abstract class WebSocketNode {
private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) { private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid)); if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid));
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + JsonConvert.root().convertTo(message) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
CompletableFuture<Integer> localFuture = null; CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid); if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (this.sncpNodeAddresses == null || this.remoteNode == null) {
@@ -435,7 +442,7 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> { CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) { if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
return CompletableFuture.completedFuture(0); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
} }
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
@@ -445,7 +452,7 @@ public abstract class WebSocketNode {
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b); : future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
} }
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
}); });
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
} }

View File

@@ -433,6 +433,10 @@ class WebSocketRunner implements Runnable {
return futureResult; return futureResult;
} }
public boolean isClosed() {
return closed;
}
public void closeRunner(int code) { public void closeRunner(int code) {
if (closed) return; if (closed) return;
synchronized (this) { synchronized (this) {

View File

@@ -9,6 +9,7 @@ import java.io.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@@ -62,13 +63,20 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected Type messageTextType; //RestWebSocket时会被修改 protected Type messageTextType; //RestWebSocket时会被修改
//同RestWebSocket.single
protected boolean single = true; //是否单用户单连接 protected boolean single = true; //是否单用户单连接
//同RestWebSocket.liveinterval
protected int liveinterval = DEFAILT_LIVEINTERVAL; protected int liveinterval = DEFAILT_LIVEINTERVAL;
//同RestWebSocket.wsmaxconns
protected int wsmaxconns = 0; protected int wsmaxconns = 0;
protected int wsmaxbody = 0; //同RestWebSocket.wsmaxbody
protected int wsmaxbody = 16 * 1024;
//同RestWebSocket.anyuser
protected boolean anyuser = false;
@Resource(name = "jsonconvert") @Resource(name = "jsonconvert")
protected Convert jsonConvert; protected Convert jsonConvert;
@@ -170,7 +178,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
sessionFuture.whenComplete((sessionid, ex) -> { sessionFuture.whenComplete((sessionid, ex) -> {
if (sessionid == null || ex != null) { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
response.finish(true); response.finish(true);
return; return;
@@ -185,11 +193,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.setHeader("Connection", "Upgrade"); response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket"); response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
response.sendBody((ByteBuffer) null, null, new AsyncHandler<Integer, Void>() { response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() {
WebSocketRunner temprunner = null;
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext(); HttpContext context = response.getContext();
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid(); CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) { if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
@@ -197,13 +209,15 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
if (userid == null || ex2 != null) { if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true); response.finish(true);
return; return;
} }
Runnable runHandler = () -> {
temprunner = null;
webSocket._userid = userid; webSocket._userid = userid;
if (single) { if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> { WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {
if (rs) webSocket.onSingleRepeatConnect(); if (rs) webSocket.onSingleRepeatConnect();
WebSocketServlet.this.node.localEngine.add(webSocket); WebSocketServlet.this.node.localEngine.add(webSocket);
@@ -219,7 +233,55 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
} }
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((Integer v, Throwable t) -> {
if (userid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
} else {
runHandler.run();
}
}); });
} else {
runHandler.run();
}
});
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((Integer v, Throwable t) -> {
if (sessionid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
} else {
createUseridHandler.run();
}
});
} else {
createUseridHandler.run();
}
} }
@Override @Override

View File

@@ -9,6 +9,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -138,12 +139,12 @@ public abstract class Sncp {
} }
static void checkAsyncModifier(Class param, Method method) { static void checkAsyncModifier(Class param, Method method) {
if (param == AsyncHandler.class) return; if (param == CompletionHandler.class) return;
if (Modifier.isFinal(param.getModifiers())) { if (Modifier.isFinal(param.getModifiers())) {
throw new RuntimeException("AsyncHandler Type Parameter on {" + method + "} cannot final modifier"); throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} cannot final modifier");
} }
if (!Modifier.isPublic(param.getModifiers())) { if (!Modifier.isPublic(param.getModifiers())) {
throw new RuntimeException("AsyncHandler Type Parameter on {" + method + "} must be public modifier"); throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} must be public modifier");
} }
if (param.isInterface()) return; if (param.isInterface()) return;
boolean constructorflag = false; boolean constructorflag = false;
@@ -388,8 +389,8 @@ public abstract class Sncp {
int varindex = 0; int varindex = 0;
boolean handlerFuncFlag = false; boolean handlerFuncFlag = false;
for (Class pt : paramtypes) { for (Class pt : paramtypes) {
if (AsyncHandler.class.isAssignableFrom(pt)) { if (CompletionHandler.class.isAssignableFrom(pt)) {
if (handlerFuncFlag) throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); if (handlerFuncFlag) throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
checkAsyncModifier(pt, method); checkAsyncModifier(pt, method);
handlerFuncFlag = true; handlerFuncFlag = true;
} }

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.net.sncp; package org.redkale.net.sncp;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Level; import java.util.logging.Level;
import jdk.internal.org.objectweb.asm.*; import jdk.internal.org.objectweb.asm.*;
@@ -26,7 +27,7 @@ import org.redkale.util.*;
* @param <V> 结果对象的泛型 * @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型 * @param <A> 附件对象的泛型
*/ */
public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> { public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
public Object[] sncp_getParams(); public Object[] sncp_getParams();
@@ -42,9 +43,9 @@ public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> {
* <blockquote><pre> * <blockquote><pre>
* *
* 考虑点: * 考虑点:
* 1、AsyncHandler子类是接口且还有其他多个方法 * 1、CompletionHandler子类是接口且还有其他多个方法
* 2、AsyncHandler子类是类 需要继承,且必须有空参数构造函数 * 2、CompletionHandler子类是类 需要继承,且必须有空参数构造函数
* 3、AsyncHandler子类无论是接口还是类都可能存在其他泛型 * 3、CompletionHandler子类无论是接口还是类都可能存在其他泛型
* *
* public class XXXAsyncHandler_DyncSncpAsyncHandler_4323 extends XXXAsyncHandler implements SncpAsyncHandler { * public class XXXAsyncHandler_DyncSncpAsyncHandler_4323 extends XXXAsyncHandler implements SncpAsyncHandler {
* *
@@ -91,11 +92,11 @@ public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> {
* *
* </pre></blockquote> * </pre></blockquote>
* *
* @param handlerClass AsyncHandler类型或子类 * @param handlerClass CompletionHandler类型或子类
* *
* @return Creator * @return Creator
*/ */
public static Creator<SncpAsyncHandler> createCreator(Class<? extends AsyncHandler> handlerClass) { public static Creator<SncpAsyncHandler> createCreator(Class<? extends CompletionHandler> handlerClass) {
//------------------------------------------------------------- //-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface(); final boolean handlerinterface = handlerClass.isInterface();
final String handlerClassName = handlerClass.getName().replace('.', '/'); final String handlerClassName = handlerClass.getName().replace('.', '/');

View File

@@ -103,6 +103,7 @@ public final class SncpClient {
this.actions = methodens.toArray(new SncpAction[methodens.size()]); this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress(); this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort(); this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP clientAddress only support IPv4");
} }
static List<SncpAction> getSncpActions(final Class serviceClass) { static List<SncpAction> getSncpActions(final Class serviceClass) {
@@ -292,7 +293,7 @@ public final class SncpClient {
//只给远程模式调用的 //只给远程模式调用的
public <T> T remote(final int index, final Object... params) { public <T> T remote(final int index, final Object... params) {
final SncpAction action = actions[index]; final SncpAction action = actions[index];
final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null; final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null;
final BsonReader reader = bsonConvert.pollBsonReader(); final BsonReader reader = bsonConvert.pollBsonReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params); CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
@@ -338,7 +339,7 @@ public final class SncpClient {
} }
} }
private CompletableFuture<byte[]> remote0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final Type[] myparamtypes = action.paramTypes; final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass; final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
@@ -346,7 +347,7 @@ public final class SncpClient {
final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER); writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]); bsonConvert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]);
} }
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime(); final long seqid = System.nanoTime();
@@ -570,12 +571,12 @@ public final class SncpClient {
if (anns.length > 0) { if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes(); Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) { for (int i = 0; i < params.length; i++) {
if (AsyncHandler.class.isAssignableFrom(params[i])) { if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (boolReturnTypeFuture) { if (boolReturnTypeFuture) {
throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); throw new RuntimeException(method + " have both CompletionHandler and CompletableFuture");
} }
if (handlerFuncIndex >= 0) { if (handlerFuncIndex >= 0) {
throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
} }
Sncp.checkAsyncModifier(params[i], method); Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i; handlerFuncIndex = i;
@@ -616,7 +617,7 @@ public final class SncpClient {
this.handlerAttachParamIndex = handlerAttachIndex; this.handlerAttachParamIndex = handlerAttachIndex;
this.paramAttrs = hasattr ? atts : null; this.paramAttrs = hasattr ? atts : null;
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) { if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
throw new RuntimeException(method + " have AsyncHandler type parameter but return type is not void"); throw new RuntimeException(method + " have CompletionHandler type parameter but return type is not void");
} }
} }

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.lang.annotation.*; import java.lang.annotation.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.*; import java.util.function.*;
@@ -118,7 +119,7 @@ public final class SncpDynServlet extends SncpServlet {
SncpAsyncHandler handler = null; SncpAsyncHandler handler = null;
try { try {
if (action.handlerFuncParamIndex >= 0) { if (action.handlerFuncParamIndex >= 0) {
if (action.handlerFuncParamClass == AsyncHandler.class) { if (action.handlerFuncParamClass == CompletionHandler.class) {
handler = new DefaultSncpAsyncHandler(action, in, out, request, response); handler = new DefaultSncpAsyncHandler(action, in, out, request, response);
} else { } else {
Creator<SncpAsyncHandler> creator = action.handlerCreator; Creator<SncpAsyncHandler> creator = action.handlerCreator;
@@ -178,15 +179,15 @@ public final class SncpDynServlet extends SncpServlet {
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null
protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数 protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在CompletionHandler参数
protected boolean boolReturnTypeFuture = false; // 返回结果类型是否为 CompletableFuture protected boolean boolReturnTypeFuture = false; // 返回结果类型是否为 CompletableFuture
protected Class handlerFuncParamClass; //AsyncHandler参数的类型 protected Class handlerFuncParamClass; //CompletionHandler参数的类型
public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable; public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable;
//只有同步方法才调用 (没有AsyncHandler、CompletableFuture) //只有同步方法才调用 (没有CompletionHandler、CompletableFuture)
public final void _callParameter(final BsonWriter out, final Object... params) { public final void _callParameter(final BsonWriter out, final Object... params) {
if (paramAttrs != null) { if (paramAttrs != null) {
for (int i = 1; i < paramAttrs.length; i++) { for (int i = 1; i < paramAttrs.length; i++) {
@@ -207,10 +208,10 @@ public final class SncpDynServlet extends SncpServlet {
* return false; * return false;
* } * }
* *
* public void insert(AsyncHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) { * public void insert(CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* } * }
* *
* public void update(long show, short v2, AsyncHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) { * public void update(long show, short v2, CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* } * }
* *
* public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id) { * public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id) {
@@ -241,7 +242,7 @@ public final class SncpDynServlet extends SncpServlet {
* &#064;Override * &#064;Override
* public void action(BsonReader in, BsonWriter out, SncpAsyncHandler handler) throws Throwable { * public void action(BsonReader in, BsonWriter out, SncpAsyncHandler handler) throws Throwable {
* SncpAsyncHandler arg0 = handler; * SncpAsyncHandler arg0 = handler;
* convert.convertFrom(AsyncHandler.class, in); * convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[2], in); * TestBean arg1 = convert.convertFrom(paramTypes[2], in);
* String arg2 = convert.convertFrom(paramTypes[3], in); * String arg2 = convert.convertFrom(paramTypes[3], in);
* int arg3 = convert.convertFrom(paramTypes[4], in); * int arg3 = convert.convertFrom(paramTypes[4], in);
@@ -259,7 +260,7 @@ public final class SncpDynServlet extends SncpServlet {
* long a1 = convert.convertFrom(paramTypes[1], in); * long a1 = convert.convertFrom(paramTypes[1], in);
* short a2 = convert.convertFrom(paramTypes[2], in); * short a2 = convert.convertFrom(paramTypes[2], in);
* SncpAsyncHandler a3 = handler; * SncpAsyncHandler a3 = handler;
* convert.convertFrom(AsyncHandler.class, in); * convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[4], in); * TestBean arg1 = convert.convertFrom(paramTypes[4], in);
* String arg2 = convert.convertFrom(paramTypes[5], in); * String arg2 = convert.convertFrom(paramTypes[5], in);
* int arg3 = convert.convertFrom(paramTypes[6], in); * int arg3 = convert.convertFrom(paramTypes[6], in);
@@ -353,12 +354,12 @@ public final class SncpDynServlet extends SncpServlet {
final Class[] paramClasses = method.getParameterTypes(); final Class[] paramClasses = method.getParameterTypes();
int[][] codes = new int[paramClasses.length][2]; int[][] codes = new int[paramClasses.length][2];
for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数 for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数
if (AsyncHandler.class.isAssignableFrom(paramClasses[i])) { if (CompletionHandler.class.isAssignableFrom(paramClasses[i])) {
if (boolReturnTypeFuture) { if (boolReturnTypeFuture) {
throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); throw new RuntimeException(method + " have both CompletionHandler and CompletableFuture");
} }
if (handlerFuncIndex >= 0) { if (handlerFuncIndex >= 0) {
throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
} }
Sncp.checkAsyncModifier(paramClasses[i], method); Sncp.checkAsyncModifier(paramClasses[i], method);
handlerFuncIndex = i; handlerFuncIndex = i;
@@ -372,7 +373,7 @@ public final class SncpDynServlet extends SncpServlet {
intconst++; intconst++;
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
mv.visitLdcInsn(Type.getType(Type.getDescriptor(AsyncHandler.class))); mv.visitLdcInsn(Type.getType(Type.getDescriptor(CompletionHandler.class)));
mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false); mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false);
mv.visitInsn(POP); mv.visitInsn(POP);

View File

@@ -49,6 +49,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
super(context, request); super(context, request);
this.addrBytes = context.getServerAddress().getAddress().getAddress(); this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort(); this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");
} }
public void finish(final int retcode, final BsonWriter out) { public void finish(final int retcode, final BsonWriter out) {

View File

@@ -124,7 +124,8 @@ public class RetResult<T> {
*/ */
public RetResult<T> attach(String key, Object value) { public RetResult<T> attach(String key, Object value) {
if (this.attach == null) this.attach = new HashMap<>(); if (this.attach == null) this.attach = new HashMap<>();
this.attach.put(key, value == null ? null : String.valueOf(value)); boolean canstr = value != null && (value instanceof CharSequence || value.getClass().isPrimitive());
this.attach.put(key, value == null ? null : (canstr ? String.valueOf(value) : JsonConvert.root().convertTo(value)));
return this; return this;
} }

View File

@@ -22,10 +22,10 @@ import org.redkale.util.*;
* <blockquote><pre> * <blockquote><pre>
* 异步方法: * 异步方法:
* Service编写异步方法 * Service编写异步方法
* 1、异步方法有且仅有一个类型为AsyncHandler的参数 返回类型必须是void。若参数类型为AsyncHandler子类必须保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。 * 1、异步方法有且仅有一个类型为CompletionHandler的参数 返回类型必须是void。若参数类型为CompletionHandler子类必须保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
* 2、异步方法返回类型是CompletableFuture。 * 2、异步方法返回类型是CompletableFuture。
* 例如: * 例如:
* public void insertRecord(AsyncHandler&#60;Integer, Record&#62; handler, String name, &#64;RpcAttachment Record record); * public void insertRecord(CompletionHandler&#60;Integer, Record&#62; handler, String name, &#64;RpcAttachment Record record);
* *
* </pre></blockquote> * </pre></blockquote>
* *

View File

@@ -61,7 +61,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
/** /**
* 当用户连接到节点需要更新到CacheSource * 当用户连接到节点需要更新到CacheSource
* *
* @param userid String * @param userid Serializable
* @param sncpAddr InetSocketAddress * @param sncpAddr InetSocketAddress
* *
* @return 无返回值 * @return 无返回值
@@ -78,7 +78,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
/** /**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除 * 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
* *
* @param userid String * @param userid Serializable
* @param sncpAddr InetSocketAddress * @param sncpAddr InetSocketAddress
* *
* @return 无返回值 * @return 无返回值
@@ -91,10 +91,27 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return future; return future;
} }
/**
* 更改用户ID需要更新到CacheSource
*
* @param olduserid Serializable
* @param newuserid Serializable
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr));
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
return future;
}
/** /**
* 强制关闭用户的WebSocket * 强制关闭用户的WebSocket
* *
* @param userid String * @param userid Serializable
* @param sncpAddr InetSocketAddress * @param sncpAddr InetSocketAddress
* *
* @return 无返回值 * @return 无返回值

View File

@@ -168,7 +168,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
if (remoteSource != null && !Sncp.isRemote(this)) { if (remoteSource != null && !Sncp.isRemote(this)) {
SncpClient client = Sncp.getSncpClient((Service) remoteSource); SncpClient client = Sncp.getSncpClient((Service) remoteSource);
if (client != null && client.getRemoteGroupTransport() != null) { if (client != null && client.getRemoteGroupTransport() != null
&& client.getRemoteGroupTransport().getRemoteAddresses().length > 0) {
super.runAsync(() -> { super.runAsync(() -> {
try { try {
CompletableFuture<List<CacheEntry<Object>>> listFuture = remoteSource.queryListAsync(); CompletableFuture<List<CacheEntry<Object>>> listFuture = remoteSource.queryListAsync();
@@ -190,62 +191,61 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
/** /**
public static void main(String[] args) throws Exception { * public static void main(String[] args) throws Exception {
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); * AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379")); * conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379"));
*
CacheMemorySource source = new CacheMemorySource(); * CacheMemorySource source = new CacheMemorySource();
source.defaultConvert = JsonFactory.root().getConvert(); * source.defaultConvert = JsonFactory.root().getConvert();
source.initValueType(String.class); //value用String类型 * source.initValueType(String.class); //value用String类型
source.initTransient(false); * source.initTransient(false);
source.init(conf); * source.init(conf);
*
System.out.println("------------------------------------"); * System.out.println("------------------------------------");
source.remove("key1"); * source.remove("key1");
source.remove("key2"); * source.remove("key2");
source.remove("300"); * source.remove("300");
source.set("key1", "value1"); * source.set("key1", "value1");
source.setString("keystr1", "strvalue1"); * source.setString("keystr1", "strvalue1");
source.setLong("keylong1", 333L); * source.setLong("keylong1", 333L);
source.set("300", "4000"); * source.set("300", "4000");
source.getAndRefresh("key1", 3500); * source.getAndRefresh("key1", 3500);
System.out.println("[有值] 300 GET : " + source.get("300")); * System.out.println("[有值] 300 GET : " + source.get("300"));
System.out.println("[有值] key1 GET : " + source.get("key1")); * System.out.println("[有值] key1 GET : " + source.get("key1"));
System.out.println("[无值] key2 GET : " + source.get("key2")); * System.out.println("[无值] key2 GET : " + source.get("key2"));
System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L)); * System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
System.out.println("[有值] key1 EXISTS : " + source.exists("key1")); * System.out.println("[有值] key1 EXISTS : " + source.exists("key1"));
System.out.println("[无值] key2 EXISTS : " + source.exists("key2")); * System.out.println("[无值] key2 EXISTS : " + source.exists("key2"));
*
source.remove("keys3"); * source.remove("keys3");
source.appendListItem("keys3", "vals1"); * source.appendListItem("keys3", "vals1");
source.appendListItem("keys3", "vals2"); * source.appendListItem("keys3", "vals2");
System.out.println("-------- keys3 追加了两个值 --------"); * System.out.println("-------- keys3 追加了两个值 --------");
System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3")); * System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3"));
System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3")); * System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3"));
source.removeListItem("keys3", "vals1"); * source.removeListItem("keys3", "vals1");
System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3")); * System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3"));
source.getCollectionAndRefresh("keys3", 3000); * source.getCollectionAndRefresh("keys3", 3000);
*
source.remove("sets3"); * source.remove("sets3");
source.appendSetItem("sets3", "setvals1"); * source.appendSetItem("sets3", "setvals1");
source.appendSetItem("sets3", "setvals2"); * source.appendSetItem("sets3", "setvals2");
source.appendSetItem("sets3", "setvals1"); * source.appendSetItem("sets3", "setvals1");
System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3")); * System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3"));
System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3")); * System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3"));
source.removeSetItem("sets3", "setvals1"); * source.removeSetItem("sets3", "setvals1");
System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3")); * System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3"));
System.out.println("sets3 大小 : " + source.getCollectionSize("sets3")); * System.out.println("sets3 大小 : " + source.getCollectionSize("sets3"));
System.out.println("all keys: " + source.queryKeys()); * System.out.println("all keys: " + source.queryKeys());
System.out.println("newnum 值 : " + source.incr("newnum")); * System.out.println("newnum 值 : " + source.incr("newnum"));
System.out.println("newnum 值 : " + source.decr("newnum")); * System.out.println("newnum 值 : " + source.decr("newnum"));
System.out.println("------------------------------------"); * System.out.println("------------------------------------");
source.destroy(null); * source.destroy(null);
source.init(null); * source.init(null);
System.out.println("all keys: " + source.queryKeys()); * System.out.println("all keys: " + source.queryKeys());
System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L)); * System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
} * }
*/ */
@Override @Override
public void close() throws Exception { //给Application 关闭时调用 public void close() throws Exception { //给Application 关闭时调用
destroy(null); destroy(null);

View File

@@ -36,15 +36,15 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName()); protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
protected final String name; protected String name;
protected final URL conf; protected URL conf;
protected final boolean cacheForbidden; protected boolean cacheForbidden;
protected final PoolJdbcSource readPool; protected PoolJdbcSource readPool;
protected final PoolJdbcSource writePool; protected PoolJdbcSource writePool;
@Resource(name = "$") @Resource(name = "$")
protected DataCacheListener cacheListener; protected DataCacheListener cacheListener;
@@ -52,6 +52,44 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true); protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) { public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
this.preConstruct(unitName, readprop, writeprop);
this.initByProperties(unitName, readprop, writeprop);
}
public DataJdbcSource() {
}
@Override
public void init(AnyValue config) { //通过空构造函数创建的对象需要调用init方法进行初始化
String unitName = config.getValue("name");
Properties readprop = new Properties();
Properties writeprop = new Properties();
for (AnyValue confs : config.getAnyValues("properties")) {
boolean write = confs.getValue("name", "").contains("write");
for (AnyValue conf : confs.getAnyValues("property")) {
String pn = conf.getValue("name");
String pv = conf.getValue("value");
if (pn == null || pv == null) continue;
(write ? writeprop : readprop).put(pn, pv);
}
}
for (AnyValue conf : config.getAnyValues("property")) {
String pn = conf.getValue("name");
String pv = conf.getValue("value");
if (pn == null || pv == null) continue;
readprop.put(pn, pv);
}
if (writeprop.isEmpty()) writeprop = readprop;
this.initByProperties(unitName, readprop, writeprop);
}
//构造前调用
protected void preConstruct(String unitName, Properties readprop, Properties writeprop) {
}
protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName; this.name = unitName;
this.conf = null; this.conf = null;
this.readPool = new PoolJdbcSource(this, "read", readprop); this.readPool = new PoolJdbcSource(this, "read", readprop);
@@ -59,6 +97,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE)); this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
} }
@Local
@Override
public String getType() {
return "jdbc";
}
@Override @Override
public final String resourceName() { public final String resourceName() {
return name; return name;
@@ -70,6 +114,14 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
writePool.close(); writePool.close();
} }
public PoolJdbcSource getReadPoolJdbcSource() {
return readPool;
}
public PoolJdbcSource getWritePoolJdbcSource() {
return writePool;
}
public Connection createReadSQLConnection() { public Connection createReadSQLConnection() {
return readPool.poll(); return readPool.poll();
} }
@@ -1443,7 +1495,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Connection conn = createReadSQLConnection(); final Connection conn = createReadSQLConnection();
try { try {
final SelectColumn sels = selects; final SelectColumn sels = selects;
final String sql = "SELECT * FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk); final String sql = "SELECT " + info.getQueryColumns(null, selects) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql); if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql);
conn.setReadOnly(true); conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
@@ -1520,7 +1572,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis); final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT a.* FROM " + info.getTable(node) + " a" + (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where)); final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql); if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql);
conn.setReadOnly(true); conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
@@ -2246,7 +2298,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis(); final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info); final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis); final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT a.* FROM " + info.getTable(node) + " a" + (join == null ? "" : join) final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + info.createSQLOrderby(flipper); + ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) { if (info.isLoggable(logger, Level.FINEST)) {
logger.finest(clazz.getSimpleName() + " query sql=" + sql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getOffset() + "," + flipper.getLimit()))); logger.finest(clazz.getSimpleName() + " query sql=" + sql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getOffset() + "," + flipper.getLimit())));

View File

@@ -24,6 +24,13 @@ import org.redkale.util.*;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public interface DataSource { public interface DataSource {
/**
* 获取数据源类型
*
* @return String
*/
public String getType();
//----------------------insertAsync----------------------------- //----------------------insertAsync-----------------------------
/** /**
* 新增记录, 多对象必须是同一个Entity类 <br> * 新增记录, 多对象必须是同一个Entity类 <br>

View File

@@ -92,6 +92,7 @@ public final class DataSources {
} }
} }
if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource"); if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource");
if (writeprop == null) writeprop = readprop;
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName()); String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName());
if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop); if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop);
try { try {

View File

@@ -286,7 +286,7 @@ public final class EntityCache<T> {
rs = rs2; rs = rs2;
} else if (func == DISTINCTCOUNT) { } else if (func == DISTINCTCOUNT) {
Map rs2 = new LinkedHashMap(); Map rs2 = new LinkedHashMap();
rs.forEach((x, y) -> rs2.put(x, ((Set) y).size())); rs.forEach((x, y) -> rs2.put(x, ((Set) y).size() + 0L));
rs = rs2; rs = rs2;
} }
return rs; return rs;

View File

@@ -434,6 +434,26 @@ public final class EntityInfo<T> {
return deleteSQL.replace("${newtable}", getTable(bean)); return deleteSQL.replace("${newtable}", getTable(bean));
} }
/**
* 获取查询字段列表
*
* @param tabalis 表别名
* @param selects 过滤字段
*
* @return String
*/
public CharSequence getQueryColumns(String tabalis, SelectColumn selects) {
if (selects == null) return tabalis == null ? "*" : (tabalis + ".*");
StringBuilder sb = new StringBuilder();
for (Attribute attr : this.attributes) {
if (!selects.test(attr.field())) continue;
if (sb.length() > 0) sb.append(',');
sb.append(getSQLColumn(tabalis, attr.field()));
}
if (sb.length() == 0) sb.append('*');
return sb;
}
/** /**
* 根据主键值获取Entity的表名 * 根据主键值获取Entity的表名
* *

View File

@@ -16,10 +16,12 @@ import java.util.function.*;
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @deprecated 使用 java.nio.channels.CompletionHandler 代替
* @author zhangjx * @author zhangjx
* @param <V> 结果对象的泛型 * @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型 * @param <A> 附件对象的泛型
*/ */
@Deprecated
public interface AsyncHandler<V, A> extends CompletionHandler<V, A> { public interface AsyncHandler<V, A> extends CompletionHandler<V, A> {
/** /**

View File

@@ -19,7 +19,7 @@ import java.util.logging.*;
* @author zhangjx * @author zhangjx
* @param <T> 对象池元素的数据类型 * @param <T> 对象池元素的数据类型
*/ */
public final class ObjectPool<T> implements Supplier<T> { public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName()); private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
@@ -78,21 +78,27 @@ public final class ObjectPool<T> implements Supplier<T> {
return result; return result;
} }
public void offer(final T e) { @Override
public void accept(final T e) {
if (e != null && recycler.test(e)) { if (e != null && recycler.test(e)) {
if (cycleCounter != null) cycleCounter.incrementAndGet(); if (cycleCounter != null) cycleCounter.incrementAndGet();
if (debug) { // if (debug) {
for (T t : queue) { // for (T t : queue) {
if (t == e) { // if (t == e) {
logger.log(Level.WARNING, "[" + Thread.currentThread().getName() + "] repeat offer the same object(" + e + ")", new Exception()); // logger.log(Level.WARNING, "[" + Thread.currentThread().getName() + "] repeat offer the same object(" + e + ")", new Exception());
return; // return;
} // }
} // }
} // }
queue.offer(e); queue.offer(e);
} }
} }
@Deprecated
public void offer(final T e) {
accept(e);
}
public long getCreatCount() { public long getCreatCount() {
return creatCounter.longValue(); return creatCounter.longValue();
} }

View File

@@ -17,7 +17,7 @@ public final class Redkale {
} }
public static String getDotedVersion() { public static String getDotedVersion() {
return "1.8.6"; return "1.8.8";
} }
public static int getMajorVersion() { public static int getMajorVersion() {

View File

@@ -8,10 +8,11 @@ import java.io.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.*; import java.nio.charset.*;
import java.time.*; import java.time.*;
import java.util.*; import java.util.*;
import java.util.function.Predicate; import java.util.function.*;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import javax.net.ssl.*; import javax.net.ssl.*;
@@ -229,6 +230,191 @@ public final class Utility {
return news; return news;
} }
/**
* 获取int数组之和
*
* @param array 数组
*
* @return int
*/
public static int sum(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int sum = 0;
for (int i : array) {
sum += i;
}
return sum;
}
/**
* 获取long数组之和
*
* @param array 数组
*
* @return long
*/
public static long sum(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long sum = 0L;
for (long i : array) {
sum += i;
}
return sum;
}
/**
* 获取int数组最大值
*
* @param array 数组
*
* @return int
*/
public static int max(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int max = array[0];
for (int i : array) {
if (i > max) i = max;
}
return max;
}
/**
* 获取long数组最大值
*
* @param array 数组
*
* @return long
*/
public static long max(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long max = array[0];
for (long i : array) {
if (i > max) i = max;
}
return max;
}
/**
* 获取int数组最小值
*
* @param array 数组
*
* @return int
*/
public static long min(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int min = array[0];
for (int i : array) {
if (i < min) i = min;
}
return min;
}
/**
* 获取long数组最小值
*
* @param array 数组
*
* @return long
*/
public static long min(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long min = array[0];
for (long i : array) {
if (i < min) i = min;
}
return min;
}
/**
* 将int数组用分隔符拼接成字符串
*
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static String joining(final int[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (int i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将long数组用分隔符拼接成字符串
*
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static String joining(final long[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (long i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将对象数组用分隔符拼接成字符串
*
* @param <T> 泛型
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static <T> String joining(final T[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (T i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将一个或多个int新元素添加到int数组结尾
*
* @param array 原数组
* @param objs 待追加数据
*
* @return 新数组
*/
public static int[] append(final int[] array, final int... objs) {
if (array == null || array.length == 0) return objs;
if (objs == null || objs.length == 0) return array;
final int[] news = new int[array.length + objs.length];
System.arraycopy(array, 0, news, 0, array.length);
System.arraycopy(objs, 0, news, array.length, objs.length);
return news;
}
/**
* 将一个或多个long新元素添加到long数组结尾
*
* @param array 原数组
* @param objs 待追加数据
*
* @return 新数组
*/
public static long[] append(final long[] array, final long... objs) {
if (array == null || array.length == 0) return objs;
if (objs == null || objs.length == 0) return array;
final long[] news = new long[array.length + objs.length];
System.arraycopy(array, 0, news, 0, array.length);
System.arraycopy(objs, 0, news, array.length, objs.length);
return news;
}
/** /**
* 将一个或多个新元素添加到数组结尾 * 将一个或多个新元素添加到数组结尾
* *
@@ -458,6 +644,76 @@ public final class Utility {
return back; return back;
} }
/**
* 创建 CompletionHandler 对象
*
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <V, A> CompletionHandler<V, A> createAsyncHandler(final BiConsumer<V, A> success, final BiConsumer<Throwable, A> fail) {
return new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
if (success != null) success.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有返回结果的 CompletionHandler 对象
*
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <A> CompletionHandler<Void, A> createAsyncHandler(final Consumer<A> success, final BiConsumer<Throwable, A> fail) {
return new CompletionHandler<Void, A>() {
@Override
public void completed(Void result, A attachment) {
if (success != null) success.accept(attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有附件对象的 CompletionHandler 对象
*
* @param <V> 结果对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <V> CompletionHandler<V, Void> createAsyncHandler(final Consumer<V> success, final Consumer<Throwable> fail) {
return new CompletionHandler<V, Void>() {
@Override
public void completed(V result, Void attachment) {
if (success != null) success.accept(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
if (fail != null) fail.accept(exc);
}
};
}
/** /**
* 获取格式为yyyy-MM-dd HH:mm:ss的当前时间 * 获取格式为yyyy-MM-dd HH:mm:ss的当前时间
* *

View File

@@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.asm;
import java.io.*;
import org.redkale.util.Utility;
/**
*
* @author zhangjx
*/
public class AsmCreator {
public static void main(String[] args) throws Throwable {
boolean realasm = true; //从http://forge.ow2.org/projects/asm/ 下载最新asm的src放在 srcasmroot 目录下
File srcasmroot = new File("D:/JAVA/JDK源码/JDK9源码/java.base/jdk/internal/org/objectweb/asm");
if(realasm) srcasmroot = new File("D:/JAVA/JDK源码/org/objectweb/asm");
File destasmroot = new File("D:/Java-Projects/RedkaleProject/src/org/redkale/asm");
String line = null;
LineNumberReader txtin = new LineNumberReader(new FileReader(new File(destasmroot, "asm.txt")));
while ((line = txtin.readLine()) != null) {
line = line.trim();
if (!line.endsWith(".java")) continue;
File srcfile = new File(srcasmroot, line);
File destfile = new File(destasmroot, line);
String content = Utility.readThenClose(new FileInputStream(srcfile));
FileOutputStream out = new FileOutputStream(destfile);
out.write(content.replace("jdk.internal.org.objectweb", "org.redkale").replace("org.objectweb", "org.redkale").getBytes());
out.close();
}
}
}

View File

@@ -13,6 +13,7 @@ import org.redkale.convert.bson.BsonConvert;
import java.nio.*; import java.nio.*;
import java.util.*; import java.util.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.util.*;
/** /**
* *
@@ -31,6 +32,7 @@ public class BsonTestMain {
main3(args); main3(args);
main4(args); main4(args);
main5(args); main5(args);
main6(args);
} }
public static void main2(String[] args) throws Exception { public static void main2(String[] args) throws Exception {
@@ -98,4 +100,20 @@ public class BsonTestMain {
Object mapobj = convert.convertFrom(Object.class, bs); Object mapobj = convert.convertFrom(Object.class, bs);
System.out.println(mapobj); System.out.println(mapobj);
} }
public static void main6(String[] args) throws Exception {
final BsonConvert convert = BsonFactory.root().getConvert();
Optional<String> val = Optional.ofNullable("haha");
byte[] bs = convert.convertTo(val);
Object obj = convert.convertFrom(Optional.class, bs);
System.out.println(obj);
bs = convert.convertTo(Object.class, val);
obj = convert.convertFrom(Object.class, bs);
System.out.println(obj);
bs = convert.convertTo(new TypeToken<Optional<String>>(){}.getType(), val);
obj = convert.convertFrom(new TypeToken<Optional<String>>(){}.getType(), bs);
System.out.println(obj);
System.out.println(JsonConvert.root().convertTo(val));
}
} }

View File

@@ -9,12 +9,12 @@ import java.io.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.util.AsyncHandler;
/** /**
* *
@@ -28,11 +28,11 @@ public interface HttpResponseDesc {
//增加Cookie值 //增加Cookie值
public HttpResponse addCookie(Collection<HttpCookie> cookies); public HttpResponse addCookie(Collection<HttpCookie> cookies);
//创建AsyncHandler实例将非字符串对象以JSON格式输出字符串以文本输出 //创建CompletionHandler实例将非字符串对象以JSON格式输出字符串以文本输出
public AsyncHandler createAsyncHandler(); public CompletionHandler createAsyncHandler();
//传入的AsyncHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数 //传入的CompletionHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass); public <H extends CompletionHandler> H createAsyncHandler(Class<H> handlerClass);
//设置状态码 //设置状态码
public void setStatus(int status); public void setStatus(int status);
@@ -66,10 +66,10 @@ public interface HttpResponseDesc {
public HttpResponse skipHeader(); public HttpResponse skipHeader();
//异步输出指定内容 //异步输出指定内容
public <A> void sendBody(ByteBuffer buffer, A attachment, AsyncHandler<Integer, A> handler); public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler);
//异步输出指定内容 //异步输出指定内容
public <A> void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler<Integer, A> handler); public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler);
//关闭HTTP连接如果是keep-alive则不强制关闭 //关闭HTTP连接如果是keep-alive则不强制关闭
public void finish(); public void finish();

View File

@@ -5,13 +5,13 @@
*/ */
package org.redkale.test.rest; package org.redkale.test.rest;
import org.redkale.util.AsyncHandler; import java.nio.channels.CompletionHandler;
/** /**
* *
* @author zhangjx * @author zhangjx
*/ */
public class HelloAsyncHandler implements AsyncHandler { public class HelloAsyncHandler implements CompletionHandler {
@Override @Override
public void completed(Object result, Object attachment) { public void completed(Object result, Object attachment) {

View File

@@ -1,5 +1,6 @@
package org.redkale.test.rest; package org.redkale.test.rest;
import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -42,6 +43,11 @@ public class HelloService implements Service {
return new RetResult<>(entity); return new RetResult<>(entity);
} }
//
public HttpResult showHello(int id) {
return new HttpResult("a");
}
//删除记录 //删除记录
public void deleteHello(int id) { //通过 /pipes/hello/delete/1234 删除对象 public void deleteHello(int id) { //通过 /pipes/hello/delete/1234 删除对象
source.delete(HelloEntity.class, id); source.delete(HelloEntity.class, id);
@@ -68,6 +74,7 @@ public class HelloService implements Service {
//查询List列表 //查询List列表
@RestMapping(name = "list") @RestMapping(name = "list")
@RestConvert(type = HelloEntity.class, ignoreColumns = {"createtime"})
public List<HelloEntity> queryHello(HelloBean bean) { //通过 /pipes/hello/list?bean={...} 查询List列表 public List<HelloEntity> queryHello(HelloBean bean) { //通过 /pipes/hello/list?bean={...} 查询List列表
return source.queryList(HelloEntity.class, bean); return source.queryList(HelloEntity.class, bean);
} }
@@ -96,7 +103,7 @@ public class HelloService implements Service {
//异步查询单个 //异步查询单个
@RestMapping(name = "asyncfind2") @RestMapping(name = "asyncfind2")
public void asyncFindHello(AsyncHandler hander, @RestParam(name = "#") int id) { //通过 /pipes/hello/find/1234、/pipes/hello/jsfind/1234 查询对象 public void asyncFindHello(CompletionHandler hander, @RestParam(name = "#") int id) { //通过 /pipes/hello/find/1234、/pipes/hello/jsfind/1234 查询对象
if (source != null) source.findAsync(HelloEntity.class, id); if (source != null) source.findAsync(HelloEntity.class, id);
System.out.println("-----------进入asyncfind2--------" + hander); System.out.println("-----------进入asyncfind2--------" + hander);
hander.completed(new HelloEntity(id), id); hander.completed(new HelloEntity(id), id);

View File

@@ -8,7 +8,7 @@ package org.redkale.test.service;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
@@ -165,8 +165,8 @@ public class ABMainService implements Service {
} }
@RestMapping(name = "asyncabtime") @RestMapping(name = "asyncabtime")
public void abCurrentTime(final AsyncHandler<String, Void> handler, @RestParam(name = "#") final String name) { public void abCurrentTime(final CompletionHandler<String, Void> handler, @RestParam(name = "#") final String name) {
bcService.bcCurrentTime(AsyncHandler.create((v, a) -> { bcService.bcCurrentTime(Utility.createAsyncHandler((v, a) -> {
System.out.println("执行了 ABMainService.abCurrentTime----异步方法"); System.out.println("执行了 ABMainService.abCurrentTime----异步方法");
String rs = "异步abCurrentTime: " + v; String rs = "异步abCurrentTime: " + v;
if (handler != null) handler.completed(rs, a); if (handler != null) handler.completed(rs, a);

View File

@@ -5,9 +5,10 @@
*/ */
package org.redkale.test.service; package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.AsyncHandler; import org.redkale.util.*;
/** /**
* *
@@ -24,8 +25,8 @@ public class BCService implements Service {
return rs; return rs;
} }
public void bcCurrentTime(final AsyncHandler<String, Void> handler, final String name) { public void bcCurrentTime(final CompletionHandler<String, Void> handler, final String name) {
cService.ccCurrentTime(AsyncHandler.create((v, a) -> { cService.ccCurrentTime(Utility.createAsyncHandler((v, a) -> {
System.out.println("执行了 BCService.bcCurrentTime----异步方法"); System.out.println("执行了 BCService.bcCurrentTime----异步方法");
String rs = "异步bcCurrentTime: " + (v == null ? null : v.getResult()); String rs = "异步bcCurrentTime: " + (v == null ? null : v.getResult());
if (handler != null) handler.completed(rs, null); if (handler != null) handler.completed(rs, null);

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.test.service; package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -20,7 +21,7 @@ public class CService implements Service {
return new RetResult(rs); return new RetResult(rs);
} }
public void ccCurrentTime(final AsyncHandler<RetResult<String>, Void> handler, final String name) { public void ccCurrentTime(final CompletionHandler<RetResult<String>, Void> handler, final String name) {
String rs = "异步ccCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis()); String rs = "异步ccCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
System.out.println("执行了 CService.ccCurrentTime----异步方法"); System.out.println("执行了 CService.ccCurrentTime----异步方法");
if (handler != null) handler.completed(new RetResult(rs), null); if (handler != null) handler.completed(new RetResult(rs), null);

View File

@@ -5,13 +5,13 @@
*/ */
package org.redkale.test.service; package org.redkale.test.service;
import org.redkale.util.AsyncHandler; import java.nio.channels.CompletionHandler;
/** /**
* *
* @author zhangjx * @author zhangjx
*/ */
public abstract class MyAsyncInnerHandler<V, A> implements AsyncHandler<V, A> { public abstract class MyAsyncInnerHandler<V, A> implements CompletionHandler<V, A> {
protected abstract int id2(); protected abstract int id2();

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.test.service; package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.*; import org.redkale.util.*;
@@ -19,7 +20,7 @@ public class TestService implements Service {
// return false; // return false;
// } // }
public void change(AsyncHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) { public void change(CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
} }

View File

@@ -7,6 +7,7 @@ package org.redkale.test.sncp;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.redkale.net.TransportFactory; import org.redkale.net.TransportFactory;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
@@ -97,7 +98,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
return "result: " + bean; return "result: " + bean;
} }
public void queryResult(AsyncHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) { public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " handler 运行了queryResult方法"); System.out.println(Thread.currentThread().getName() + " handler 运行了queryResult方法");
if (handler != null) handler.completed("result: " + bean, bean); if (handler != null) handler.completed("result: " + bean, bean);
} }

View File

@@ -15,7 +15,7 @@ import org.redkale.net.http.*;
* *
* @author zhangjx * @author zhangjx
*/ */
@RestWebSocket(name = "chat", catalog = "ws", comment = "文字聊天") @RestWebSocket(name = "chat", catalog = "ws", comment = "文字聊天", anyuser = true)
public class ChatWebSocket extends WebSocket<Integer, Object> { public class ChatWebSocket extends WebSocket<Integer, Object> {
protected static final AtomicInteger idcreator = new AtomicInteger(10000); protected static final AtomicInteger idcreator = new AtomicInteger(10000);