58 Commits
1.8.6 ... 1.8.8

Author SHA1 Message Date
Redkale
921aedaf9d 增加Optional的序列化和反序列化 2017-12-25 10:12:01 +08:00
Redkale
0f545923b2 2017-12-23 10:39:46 +08:00
Redkale
5b32a91874 2017-12-23 10:14:27 +08:00
Redkale
25b2528416 2017-12-23 10:11:38 +08:00
Redkale
d29c95c38f 2017-12-22 10:48:31 +08:00
Redkale
a661ab2ff5 Utility增加max、min方法 2017-12-21 19:14:01 +08:00
Redkale
ac08ebee75 Utility增加joining方法 2017-12-21 17:39:45 +08:00
Redkale
8e1a287ed1 2017-12-21 17:33:18 +08:00
Redkale
9b656b3970 2017-12-21 16:50:21 +08:00
Redkale
a1c0bbf413 2017-12-21 16:24:08 +08:00
Redkale
2ad8c5d425 2017-12-21 16:19:41 +08:00
Redkale
75aaa980cf 2017-12-21 16:09:17 +08:00
Redkale
7c55326d23 2017-12-20 18:49:47 +08:00
Redkale
aa3ade5912 Convert增加AtomicIntegerSimpledCoder 2017-12-20 18:45:14 +08:00
Redkale
c692deebe9 优化MapEncoder 2017-12-20 14:20:02 +08:00
Redkale
9ff161c97d 增加WebSocket在onOpen和createUserid方法里返回无效用户信息前也可以发送信息 2017-12-20 08:54:54 +08:00
Redkale
5b1f820621 2017-12-19 20:07:57 +08:00
Redkale
30b2cffcb8 2017-12-19 16:40:58 +08:00
Redkale
709439bfca 2017-12-18 11:48:36 +08:00
Redkale
34adb238f7 ObjectPool实现Consumer接口,且将offer设为过期,建议使用accept方法 2017-12-18 10:00:18 +08:00
Redkale
76c54f8d54 修复部分带有@RestConvert方法生成RestServlet时出现的BUG 2017-12-16 16:32:21 +08:00
Redkale
6196c05f12 2017-12-16 10:42:50 +08:00
Redkale
37df0af56c Redkale 1.8.8 开始 2017-12-16 09:40:30 +08:00
Redkale
8a5e1252ab 2017-12-15 09:06:26 +08:00
Redkale
ae5430af42 2017-12-14 19:25:10 +08:00
Redkale
13cf188e25 WebSocketEngine增加forEachLocalWebSocket方法 2017-12-14 19:10:49 +08:00
Redkale
41d3dea1ac 2017-12-14 18:43:23 +08:00
Redkale
be816088f5 增加对IntStream、LongStream、DoubleStream的序列化支持 2017-12-14 18:37:54 +08:00
Redkale
5bef900b76 2017-12-14 16:01:13 +08:00
Redkale
52f61b0f96 2017-12-14 15:56:31 +08:00
Redkale
b5a4646e3b 2017-12-14 15:50:48 +08:00
Redkale
d055d5c824 2017-12-14 15:09:20 +08:00
Redkale
f14ef05c88 HttpResponse增加finish(Object)系列方法 2017-12-14 15:02:16 +08:00
Redkale
426506324f 2017-12-14 10:58:27 +08:00
Redkale
7a5e58a112 删掉AsyncHandler, 采用CompletionHandler代替 2017-12-14 10:42:24 +08:00
Redkale
2e0c58cbea WebSocket增加changeUserid功能 2017-12-12 09:33:56 +08:00
Redkale
9ded3fbb9a Update README.md 2017-12-11 18:51:48 +08:00
Redkale
acbc1032e6 Update README.md 2017-12-11 18:50:59 +08:00
Redkale
d8186b00ba 启动打印java.version信息 2017-12-11 11:39:57 +08:00
Redkale
767adcbefe 2017-12-08 19:44:39 +08:00
Redkale
67df072275 HttpRequest增加getParametersToString方法 2017-12-08 19:38:54 +08:00
Redkale
dab70af4d4 2017-12-08 14:27:22 +08:00
Redkale
b42826692d 2017-12-08 11:50:12 +08:00
Redkale
e2ab4b20c9 HttpResult支持byte[]、ByteBuffer和ByteBuffer[] 2017-12-08 11:36:24 +08:00
Redkale
511ee8a6df EntityInfo增加getQueryColumns方法 2017-12-06 18:58:30 +08:00
Redkale
463269a796 增加ApplicationListener功能 2017-12-06 14:33:39 +08:00
Redkale
2d4b865432 RestWebSocket增加anyuser功能 2017-12-06 10:20:13 +08:00
Redkale
1da73429f7 对SNCP的Address进行IPv4判断 2017-12-04 09:48:52 +08:00
Redkale
e97e6b8262 2017-11-29 14:06:01 +08:00
Redkale
e6bc34d6f8 2017-11-29 14:03:53 +08:00
Redkale
f1c4ac9e67 2017-11-29 14:01:57 +08:00
Redkale
de6c6076e4 【新增功能】DataSource可以通过application.xml配置 2017-11-29 13:56:26 +08:00
Redkale
a36e3d3819 2017-11-29 12:05:18 +08:00
Redkale
043b847f05 DataSource增加getType()方法 2017-11-29 12:01:42 +08:00
Redkale
65bc8192f0 DataJdbcSource增加preConstruct方法,方便重载 2017-11-29 11:28:10 +08:00
Redkale
71e0b60200 增加manifest的MimeType 2017-11-28 20:03:29 +08:00
Redkale
99dc7ac189 2017-11-25 16:48:59 +08:00
Redkale
8605c44f14 Redkale 1.8.7 开始 2017-11-25 14:40:39 +08:00
62 changed files with 1278 additions and 413 deletions

View File

@@ -25,4 +25,5 @@
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<h5>基本文档:&nbsp;&nbsp;&nbsp;&nbsp;<a href='https://redkale.org/articles.html' target='_blank'>https://redkale.org/articles.html</a></h5>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<h5>欢迎加入Redkale QQ群: 527523235</h5>
&nbsp;

View File

@@ -71,7 +71,13 @@
<source name="redis" value="org.redkalex.cache.RedisCacheSource" xxx="16">
<node addr="127.0.0.1" port="7070"/>
</source>
<!--
Application启动的监听事件,可配置多个节点
value: 类名必须是ApplicationListener的子类
-->
<listener value="org.redkalex.xxx.XXXApplicationListener"/>
<!--
【节点全局唯一】
全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入<property>的信息, 被注解的字段类型只能是String、primitive class

View File

@@ -8,6 +8,7 @@ ClassReader.java
ClassVisitor.java
ClassWriter.java
Context.java
CurrentFrame.java
Edge.java
FieldVisitor.java
FieldWriter.java
@@ -18,25 +19,8 @@ Item.java
Label.java
MethodVisitor.java
MethodWriter.java
ModuleVisitor.java
ModuleWriter.java
Opcodes.java
Type.java
TypePath.java
public static void main(String[] args) throws Throwable {
File srcasmroot = new File("D:/JAVA/JDK源码/JDK9源码/java.base/jdk/internal/org/objectweb/asm");
File destasmroot = new File("D:/Java-Projects/RedkaleProject/src/org/redkale/asm");
String line = null;
LineNumberReader txtin = new LineNumberReader(new FileReader(new File(destasmroot, "asm.txt")));
while ((line = txtin.readLine()) != null) {
line = line.trim();
if (!line.endsWith(".java")) continue;
File srcfile = new File(srcasmroot, line);
File destfile = new File(destasmroot, line);
String content = Utility.readThenClose(new FileInputStream(srcfile));
FileOutputStream out = new FileOutputStream(destfile);
out.write(content.replace("jdk.internal.org.objectweb", "org.redkale").getBytes());
out.close();
}
}

View File

@@ -131,6 +131,9 @@ public final class Application {
//日志
private final Logger logger;
//监听事件
private final List<ApplicationListener> listeners = new CopyOnWriteArrayList<>();
//服务启动时间
private final long startTime = System.currentTimeMillis();
@@ -233,7 +236,7 @@ public final class Application {
this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
logger.log(Level.INFO, "------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------");
//------------------配置 <transport> 节点 ------------------
ObjectPool<ByteBuffer> transportPool = null;
ExecutorService transportExec = null;
@@ -354,7 +357,7 @@ public final class Application {
File persist = new File(this.home, "conf/persistence.xml");
final String homepath = this.home.getCanonicalPath();
if (persist.isFile()) System.setProperty(DataSources.DATASOURCE_CONFPATH, persist.getCanonicalPath());
logger.log(Level.INFO, RESNAME_APP_HOME + "= " + homepath + "\r\n" + RESNAME_APP_ADDR + "= " + this.localAddress.getHostAddress());
logger.log(Level.INFO, "APP_JAVA = " + System.getProperty("java.version") + "\r\n" + RESNAME_APP_ADDR + " = " + this.localAddress.getHostAddress() + "\r\n" + RESNAME_APP_HOME + " = " + homepath);
String lib = config.getValue("lib", "${APP_HOME}/libs/*").trim().replace("${APP_HOME}", homepath);
lib = lib.isEmpty() ? (homepath + "/conf") : (lib + ";" + homepath + "/conf");
Server.loadLib(classLoader, logger, lib);
@@ -487,6 +490,15 @@ public final class Application {
}
sncpTransportFactory.addGroupInfo(ginfo);
}
for (AnyValue conf : resources.getAnyValues("listener")) {
final String listenClass = conf.getValue("value", "");
if (listenClass.isEmpty()) continue;
Class clazz = Class.forName(listenClass);
if (!ApplicationListener.class.isAssignableFrom(clazz)) continue;
ApplicationListener listener = (ApplicationListener) clazz.newInstance();
listener.init(config);
this.listeners.add(listener);
}
}
//------------------------------------------------------------------------
}
@@ -783,6 +795,9 @@ public final class Application {
application.init();
application.startSelfServer();
try {
for (ApplicationListener listener : application.listeners) {
listener.preStart(application);
}
application.start();
} catch (Exception e) {
application.logger.log(Level.SEVERE, "Application start error", e);
@@ -801,6 +816,14 @@ public final class Application {
}
private void shutdown() throws Exception {
for (ApplicationListener listener : this.listeners) {
try {
listener.preShutdown(this);
} catch (Exception e) {
logger.log(Level.WARNING, listener.getClass() + " preShutdown erroneous", e);
}
}
servers.stream().forEach((server) -> {
try {
server.shutdown();

View File

@@ -0,0 +1,45 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot;
import org.redkale.util.AnyValue;
/**
* Application启动和关闭时的监听事件 <br>
* 只能通过application.xml配置
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface ApplicationListener {
/**
* 初始化方法
*
* @param config 配置参数
*/
default void init(AnyValue config) {
}
/**
* Application 在运行start前调用
*
* @param application Application
*/
default void preStart(Application application) {
}
/**
* Application 在运行shutdown前调用
*
* @param application Application
*/
default void preShutdown(Application application) {
}
}

View File

@@ -221,7 +221,25 @@ public abstract class NodeServer {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource
DataSource source = DataSources.createDataSource(resourceName);
AnyValue sourceConf = dataResources.get(resourceName);
DataSource source = null;
boolean needinit = true;
if (sourceConf != null) {
final Class sourceType = serverClassLoader.loadClass(sourceConf.getValue("value"));
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
}
}
if (source == null) {
source = DataSources.createDataSource(resourceName); //从persistence.xml配置中创建
needinit = false;
}
application.dataSources.add(source);
appResFactory.register(resourceName, DataSource.class, source);
@@ -242,7 +260,7 @@ public abstract class NodeServer {
field.set(src, source);
rf.inject(source, self); // 给其可能包含@Resource的字段赋值;
//NodeServer.this.watchFactory.inject(src);
if (source instanceof Service) ((Service) source).init(null);
if (source instanceof Service && needinit) ((Service) source).init(sourceConf);
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
@@ -264,12 +282,8 @@ public abstract class NodeServer {
AnyValue sourceConf = cacheResource.get(resourceName);
if (sourceConf == null) sourceConf = dataResources.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
Object source;
if (DataSource.class.isAssignableFrom(sourceType)) { // DataSource
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
application.dataSources.add((DataSource) source);
appResFactory.register(resourceName, DataSource.class, source);
} else { // CacheSource
Object source = null;
if (CacheSource.class.isAssignableFrom(sourceType)) { // CacheSource
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;

View File

@@ -50,7 +50,7 @@ public final class CollectionDecoder<T> implements Decodeable<Reader, Collection
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
} else {
throw new ConvertException("collectiondecoder not support the type (" + type + ")");
throw new ConvertException("CollectionDecoder not support the type (" + type + ")");
}
} finally {
inited = true;

View File

@@ -13,10 +13,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.*;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder;
import java.util.stream.*;
import org.redkale.convert.ext.*;
import org.redkale.util.*;
@@ -89,17 +88,17 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
this.register(String.class, StringSimpledCoder.instance);
this.register(CharSequence.class, CharSequenceSimpledCoder.instance);
this.register(java.util.Date.class, DateSimpledCoder.instance);
this.register(AtomicInteger.class, AtomicIntegerSimpledCoder.instance);
this.register(AtomicLong.class, AtomicLongSimpledCoder.instance);
this.register(BigInteger.class, BigIntegerSimpledCoder.instance);
this.register(BigDecimal.class, BigDecimalSimpledCoder.instance);
this.register(InetAddress.class, InetAddressSimpledCoder.instance);
this.register(DLong.class, DLongSimpledCoder.instance);
this.register(Class.class, TypeSimpledCoder.instance);
this.register(InetSocketAddress.class, InetSocketAddressSimpledCoder.instance);
this.register(InetSocketAddress.class, InetAddressSimpledCoder.InetSocketAddressSimpledCoder.instance);
this.register(Pattern.class, PatternSimpledCoder.instance);
this.register(File.class, FileSimpledCoder.instance);
this.register(CompletionHandler.class, CompletionHandlerSimpledCoder.instance);
this.register(AsyncHandler.class, AsyncHandlerSimpledCoder.instance);
this.register(URL.class, URLSimpledCoder.instance);
this.register(URI.class, URISimpledCoder.instance);
//---------------------------------------------------------
@@ -109,9 +108,12 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
this.register(short[].class, ShortArraySimpledCoder.instance);
this.register(char[].class, CharArraySimpledCoder.instance);
this.register(int[].class, IntArraySimpledCoder.instance);
this.register(IntStream.class, IntArraySimpledCoder.IntStreamSimpledCoder.instance);
this.register(long[].class, LongArraySimpledCoder.instance);
this.register(LongStream.class, LongArraySimpledCoder.LongStreamSimpledCoder.instance);
this.register(float[].class, FloatArraySimpledCoder.instance);
this.register(double[].class, DoubleArraySimpledCoder.instance);
this.register(DoubleStream.class, DoubleArraySimpledCoder.DoubleStreamSimpledCoder.instance);
this.register(String[].class, StringArraySimpledCoder.instance);
//---------------------------------------------------------
this.register(AnyValue.class, Creator.create(AnyValue.DefaultAnyValue.class));
@@ -375,12 +377,17 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoders.put(clazz, coder);
}
public final <E> void register(final Type clazz, final Decodeable<R, E> decoder, final Encodeable<W, E> encoder) {
decoders.put(clazz, decoder);
encoders.put(clazz, encoder);
}
public final <E> void register(final Type clazz, final Decodeable<R, E> decoder) {
decoders.put(clazz, decoder);
}
public final <E> void register(final Type clazz, final Encodeable<W, E> printer) {
encoders.put(clazz, printer);
public final <E> void register(final Type clazz, final Encodeable<W, E> encoder) {
encoders.put(clazz, encoder);
}
public final <E> Decodeable<R, E> findDecoder(final Type type) {
@@ -461,6 +468,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
decoder = new StreamDecoder(this, type);
} else if (Map.class.isAssignableFrom(clazz)) {
decoder = new MapDecoder(this, type);
} else if (Optional.class == clazz) {
decoder = new OptionalCoder(this, type);
} else if (clazz == Object.class) {
od = new ObjectDecoder(type);
decoder = od;
@@ -546,6 +555,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
encoder = new StreamEncoder(this, type);
} else if (Map.class.isAssignableFrom(clazz)) {
encoder = new MapEncoder(this, type);
} else if (Optional.class == clazz) {
encoder = new OptionalCoder(this, type);
} else if (clazz == Object.class) {
return (Encodeable<W, E>) this.anyEncoder;
} else if (!clazz.getName().startsWith("java.") || java.net.HttpCookie.class == clazz) {

View File

@@ -0,0 +1,107 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert;
import java.lang.reflect.*;
import java.util.*;
/**
* Optional 的SimpledCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public class OptionalCoder<R extends Reader, W extends Writer, T> extends SimpledCoder<R, W, Optional<T>> {
private final Type type;
private final Type componentType;
protected final Class componentClass;
protected final Decodeable<Reader, T> decoder;
protected final Encodeable<Writer, T> encoder;
private boolean inited = false;
private final Object lock = new Object();
public OptionalCoder(final ConvertFactory factory, final Type type) {
this.type = type;
try {
if (type instanceof ParameterizedType) {
final ParameterizedType pt = (ParameterizedType) type;
this.componentType = pt.getActualTypeArguments()[0];
factory.register(type, this);
this.decoder = factory.loadDecoder(this.componentType);
if (this.componentType instanceof TypeVariable) {
this.encoder = factory.getAnyEncoder();
this.componentClass = Object.class;
} else {
if (componentType instanceof ParameterizedType) {
final ParameterizedType pt2 = (ParameterizedType) componentType;
this.componentClass = (Class) pt2.getRawType();
} else {
this.componentClass = (Class) componentType;
}
this.encoder = factory.loadEncoder(this.componentType);
}
} else {
this.componentType = Object.class;
this.componentClass = Object.class;
this.decoder = factory.loadDecoder(this.componentType);
this.encoder = factory.getAnyEncoder();
}
} finally {
inited = true;
synchronized (lock) {
lock.notifyAll();
}
}
}
@Override
public void convertTo(W out, Optional<T> value) {
if (value == null || !value.isPresent()) {
out.writeObjectNull(null);
return;
}
if (this.encoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
this.encoder.convertTo(out, value.get());
}
@Override
public Optional<T> convertFrom(R in) {
if (this.decoder == null) {
if (!this.inited) {
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return Optional.ofNullable(this.decoder.convertFrom(in));
}
}

View File

@@ -73,7 +73,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
}
public void offerBsonReader(final BsonReader in) {
if (in != null) readerPool.offer(in);
if (in != null) readerPool.accept(in);
}
//------------------------------ writer -----------------------------------------------------------
@@ -90,7 +90,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
}
public void offerBsonWriter(final BsonWriter out) {
if (out != null) writerPool.offer(out);
if (out != null) writerPool.accept(out);
}
//------------------------------ convertFrom -----------------------------------------------------------
@@ -106,7 +106,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
in.setBytes(bytes, start, len);
@SuppressWarnings("unchecked")
T rs = (T) factory.loadDecoder(type).convertFrom(in);
readerPool.offer(in);
readerPool.accept(in);
return rs;
}
@@ -145,7 +145,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny);
out.writeNull();
byte[] result = out.toArray();
writerPool.offer(out);
writerPool.accept(out);
return result;
}
return convertTo(value.getClass(), value);
@@ -157,7 +157,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
byte[] result = out.toArray();
writerPool.offer(out);
writerPool.accept(out);
return result;
}
@@ -167,7 +167,7 @@ public final class BsonConvert extends BinaryConvert<BsonReader, BsonWriter> {
final BsonWriter out = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
byte[] result = out.toArray();
writerPool.offer(out);
writerPool.accept(out);
return result;
}

View File

@@ -1,36 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert.ext;
import org.redkale.convert.*;
import org.redkale.util.AsyncHandler;
/**
* AsyncHandlerSimpledCoder 的SimpledCoder实现, 只输出null
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public final class AsyncHandlerSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, AsyncHandler> {
public static final AsyncHandlerSimpledCoder instance = new AsyncHandlerSimpledCoder();
@Override
public void convertTo(W out, AsyncHandler value) {
out.writeObjectNull(AsyncHandler.class);
}
@Override
public AsyncHandler convertFrom(R in) {
in.readObjectB(AsyncHandler.class);
return null;
}
}

View File

@@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert.ext;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.convert.*;
/**
* AtomicInteger 的SimpledCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public class AtomicIntegerSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, AtomicInteger> {
public static final AtomicIntegerSimpledCoder instance = new AtomicIntegerSimpledCoder();
@Override
public void convertTo(W out, AtomicInteger value) {
out.writeInt(value == null ? 0 : value.get());
}
@Override
public AtomicInteger convertFrom(R in) {
return new AtomicInteger(in.readInt());
}
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.convert.ext;
import java.util.stream.DoubleStream;
import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer;
@@ -12,7 +13,9 @@ import org.redkale.convert.Writer;
/**
* double[] 的SimpledCoder实现
*
* <p> 详情见: https://redkale.org
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
@@ -66,4 +69,24 @@ public final class DoubleArraySimpledCoder<R extends Reader, W extends Writer> e
}
}
public final static class DoubleStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, DoubleStream> {
public static final DoubleStreamSimpledCoder instance = new DoubleStreamSimpledCoder();
@Override
public void convertTo(W out, DoubleStream values) {
if (values == null) {
out.writeNull();
return;
}
DoubleArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public DoubleStream convertFrom(R in) {
double[] value = DoubleArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : DoubleStream.of(value);
}
}
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.convert.ext;
import java.util.stream.IntStream;
import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer;
@@ -12,7 +13,9 @@ import org.redkale.convert.Writer;
/**
* int[] 的SimpledCoder实现
*
* <p> 详情见: https://redkale.org
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
@@ -66,4 +69,24 @@ public final class IntArraySimpledCoder<R extends Reader, W extends Writer> exte
}
}
public final static class IntStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, IntStream> {
public static final IntStreamSimpledCoder instance = new IntStreamSimpledCoder();
@Override
public void convertTo(W out, IntStream values) {
if (values == null) {
out.writeNull();
return;
}
IntArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public IntStream convertFrom(R in) {
int[] value = IntArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : IntStream.of(value);
}
}
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.convert.ext;
import java.util.stream.LongStream;
import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer;
@@ -68,4 +69,24 @@ public final class LongArraySimpledCoder<R extends Reader, W extends Writer> ext
}
}
public final static class LongStreamSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, LongStream> {
public static final LongStreamSimpledCoder instance = new LongStreamSimpledCoder();
@Override
public void convertTo(W out, LongStream values) {
if (values == null) {
out.writeNull();
return;
}
LongArraySimpledCoder.instance.convertTo(out, values.toArray());
}
@Override
public LongStream convertFrom(R in) {
long[] value = LongArraySimpledCoder.instance.convertFrom(in);
return value == null ? null : LongStream.of(value);
}
}
}

View File

@@ -23,7 +23,7 @@ import org.redkale.util.*;
@SuppressWarnings("unchecked")
public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.LinkedHashMap<String, String>>() {
public static final Type TYPE_MAP_STRING_STRING = new TypeToken<java.util.HashMap<String, String>>() {
}.getType();
private static final ObjectPool<JsonReader> readerPool = JsonReader.createPool(Integer.getInteger("convert.json.pool.size", 16));
@@ -60,7 +60,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
}
public void offerJsonReader(final JsonReader in) {
if (in != null) readerPool.offer(in);
if (in != null) readerPool.accept(in);
}
//------------------------------ writer -----------------------------------------------------------
@@ -81,7 +81,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
}
public void offerJsonWriter(final JsonWriter out) {
if (out != null) writerPool.offer(out);
if (out != null) writerPool.accept(out);
}
//------------------------------ convertFrom -----------------------------------------------------------
@@ -100,7 +100,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonReader in = readerPool.get();
in.setText(text, start, len);
T rs = (T) factory.loadDecoder(type).convertFrom(in);
readerPool.offer(in);
readerPool.accept(in);
return rs;
}
@@ -142,7 +142,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonWriter out = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
String result = out.toString();
writerPool.offer(out);
writerPool.accept(out);
return result;
}
@@ -152,7 +152,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
final JsonWriter out = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(out, values);
String result = out.toString();
writerPool.offer(out);
writerPool.accept(out);
return result;
}
@@ -160,7 +160,7 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (value == null) {
new JsonStreamWriter(tiny, out).writeNull();
} else {
factory.loadEncoder(value.getClass()).convertTo(new JsonStreamWriter(tiny, out), value);
convertTo(out, value.getClass(), value);
}
}
@@ -169,7 +169,15 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (value == null) {
new JsonStreamWriter(tiny, out).writeNull();
} else {
factory.loadEncoder(type).convertTo(new JsonStreamWriter(tiny, out), value);
final JsonWriter writer = writerPool.get().tiny(tiny);
factory.loadEncoder(type).convertTo(writer, value);
byte[] bs = writer.toBytes();
writerPool.accept(writer);
try {
out.write(bs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@@ -177,7 +185,15 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
if (values == null) {
new JsonStreamWriter(tiny, out).writeNull();
} else {
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(new JsonStreamWriter(tiny, out), values);
final JsonWriter writer = writerPool.get().tiny(tiny);
((AnyEncoder) factory.getAnyEncoder()).convertMapTo(writer, values);
byte[] bs = writer.toBytes();
writerPool.accept(writer);
try {
out.write(bs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -59,6 +59,7 @@ public class JsonWriter extends Writer {
* 返回指定至少指定长度的缓冲区
*
* @param len
*
* @return
*/
private char[] expand(int len) {
@@ -108,6 +109,10 @@ public class JsonWriter extends Writer {
return new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(content, 0, count))};
}
public byte[] toBytes() {
return Utility.encodeUTF8(content, 0, count);
}
public int count() {
return this.count;
}

View File

@@ -130,13 +130,13 @@ public class Context {
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.offer(buffer);
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.offer(buffer);
bufferPool.accept(buffer);
}
}

View File

@@ -195,7 +195,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (!this.inited) return; //避免重复关闭
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
if (kill) refuseAlive();
this.context.responsePool.offer(this);
this.context.responsePool.accept(this);
}
public void finish(final byte[] bs) {

View File

@@ -178,7 +178,7 @@ public final class Transport {
}
public void offerBuffer(ByteBuffer buffer) {
bufferPool.offer(buffer);
bufferPool.accept(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {

View File

@@ -308,7 +308,7 @@ public class TransportFactory {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (counter > 3) {
bufferPool.offer(attachment);
bufferPool.accept(attachment);
localconn.dispose();
return;
}
@@ -317,7 +317,7 @@ public class TransportFactory {
localconn.read(pongBuffer, pongBuffer, this);
return;
}
bufferPool.offer(attachment);
bufferPool.accept(attachment);
localqueue.offer(localconn);
}

View File

@@ -7,6 +7,7 @@ package org.redkale.net.http;
import java.net.*;
import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.nio.charset.*;
import java.security.*;
import java.util.concurrent.*;
@@ -54,7 +55,7 @@ public class HttpContext extends Context {
}
@SuppressWarnings("unchecked")
protected <H extends AsyncHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
protected <H extends CompletionHandler> Creator<H> loadAsyncHandlerCreator(Class<H> handlerClass) {
Creator<H> creator = asyncHandlerCreators.get(handlerClass);
if (creator == null) {
creator = createAsyncHandlerCreator(handlerClass);
@@ -64,14 +65,14 @@ public class HttpContext extends Context {
}
@SuppressWarnings("unchecked")
private <H extends AsyncHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
private <H extends CompletionHandler> Creator<H> createAsyncHandlerCreator(Class<H> handlerClass) {
//生成规则与SncpAsyncHandler.Factory 很类似
//-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface();
final String handlerClassName = handlerClass.getName().replace('.', '/');
final String handlerName = AsyncHandler.class.getName().replace('.', '/');
final String handlerDesc = Type.getDescriptor(AsyncHandler.class);
final String newDynName = handlerClass.getName().replace('.', '/') + "_Dync" + AsyncHandler.class.getSimpleName() + "_" + (System.currentTimeMillis() % 10000);
final String handlerName = CompletionHandler.class.getName().replace('.', '/');
final String handlerDesc = Type.getDescriptor(CompletionHandler.class);
final String newDynName = handlerClass.getName().replace('.', '/') + "_DyncAsyncHandler_" + (System.currentTimeMillis() % 10000);
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
FieldVisitor fv;
@@ -157,7 +158,7 @@ public class HttpContext extends Context {
}
cw.visitEnd();
byte[] bytes = cw.toByteArray();
Class<AsyncHandler> newHandlerClazz = (Class<AsyncHandler>) new ClassLoader(handlerClass.getClassLoader()) {
Class<CompletionHandler> newHandlerClazz = (Class<CompletionHandler>) new ClassLoader(handlerClass.getClassLoader()) {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}

View File

@@ -519,8 +519,8 @@ public class HttpRequest extends Request<HttpContext> {
* @return cookie值
*/
public String getCookie(String name, String dfvalue) {
for (HttpCookie cookie : getCookies()) {
if (name.equals(cookie.getName())) return cookie.getValue();
for (HttpCookie c : getCookies()) {
if (name.equals(c.getName())) return c.getValue();
}
return dfvalue;
}
@@ -1134,6 +1134,38 @@ public class HttpRequest extends Request<HttpContext> {
return map0;
}
/**
* 将请求参数转换成String, 字符串格式为: bean1={}&amp;id=13&amp;name=xxx <br>
* 不会返回null没有参数返回空字符串
*
*
* @return String
*/
public String getParametersToString() {
return getParametersToString(null);
}
/**
* 将请求参数转换成String, 字符串格式为: bean1={}&amp;id=13&amp;name=xxx <br>
* 不会返回null没有参数返回空字符串
*
* @param prefix 拼接前缀, 如果无参数,返回的字符串不会含有拼接前缀
*
* @return String
*/
public String getParametersToString(String prefix) {
final StringBuilder sb = new StringBuilder();
getParameters().forEach((k, v) -> {
if (sb.length() > 0) sb.append('&');
try {
sb.append(k).append('=').append(URLEncoder.encode(v, "UTF-8"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
return (sb.length() > 0 && prefix != null) ? (prefix + sb) : sb.toString();
}
/**
* 获取所有参数名
*

View File

@@ -16,6 +16,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
@@ -147,6 +148,10 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return super.removeChannel();
}
protected AsyncConnection getChannel() {
return channel;
}
@Override
protected boolean recycle() {
boolean rs = super.recycle();
@@ -220,38 +225,32 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
/**
* 创建AsyncHandler实例
* 创建CompletionHandler实例
*
* @return AsyncHandler
* @return CompletionHandler
*/
public AsyncHandler createAsyncHandler() {
return AsyncHandler.create((v, a) -> {
if (v instanceof org.redkale.service.RetResult) {
finishJson((org.redkale.service.RetResult) v);
} else if (v instanceof CharSequence) {
finish(String.valueOf(v));
} else {
finishJson(v);
}
public CompletionHandler createAsyncHandler() {
return Utility.createAsyncHandler((v, a) -> {
finish(v);
}, (t, a) -> {
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, t);
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletionHandler", (Throwable) t);
finish(500, null);
});
}
/**
* 创建AsyncHandler子类的实例 <br>
* 创建CompletionHandler子类的实例 <br>
*
* 传入的AsyncHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
* 传入的CompletionHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
*
* @param <H> 泛型
* @param handlerClass AsyncHandler子类
* @param handlerClass CompletionHandler子类
*
* @return AsyncHandler AsyncHandler
* @return CompletionHandler
*/
@SuppressWarnings("unchecked")
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass) {
if (handlerClass == null || handlerClass == AsyncHandler.class) return (H) createAsyncHandler();
public <H extends CompletionHandler> H createAsyncHandler(Class<H> handlerClass) {
if (handlerClass == null || handlerClass == CompletionHandler.class) return (H) createAsyncHandler();
return context.loadAsyncHandlerCreator(handlerClass).create(createAsyncHandler());
}
@@ -376,7 +375,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param future 输出对象的句柄
*/
public void finishJson(final CompletableFuture future) {
finishJson(request.getJsonConvert(), future);
finish(request.getJsonConvert(), (Type) null, future);
}
/**
@@ -387,20 +386,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
@SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final CompletableFuture future) {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
return;
}
if (v instanceof CharSequence) {
finish(v.toString());
} else if (v instanceof org.redkale.service.RetResult) {
finishJson(convert, (org.redkale.service.RetResult) v);
} else {
finishJson(convert, v);
}
});
finish(convert, (Type) null, future);
}
/**
@@ -412,60 +398,86 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
@SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future) {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
return;
}
if (v instanceof CharSequence) {
finish(v.toString());
} else if (v instanceof HttpResult) {
finishJson(convert, (HttpResult) v);
} else if (v instanceof org.redkale.service.RetResult) {
finishJson(convert, (org.redkale.service.RetResult) v);
} else {
finishJson(convert, type, v);
}
});
finish(convert, type, future);
}
/**
* 将HttpResult的结果对象以JSON格式输出
* 将结果对象输出
*
* @param result HttpResult对象
*/
public void finishJson(final HttpResult result) {
finishJson(request.getJsonConvert(), result);
}
/**
* 将HttpResult的结果对象以JSON格式输出
*
* @param convert 指定的JsonConvert
* @param result HttpResult对象
* @param obj 输出对象
*/
@SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final HttpResult result) {
if (output == null) {
finish("");
return;
}
if (result.getContentType() != null) setContentType(result.getContentType());
addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus());
if (result.getResult() instanceof File) {
public void finish(final Object obj) {
finish(request.getJsonConvert(), (Type) null, obj);
}
/**
* 将结果对象输出
*
* @param convert 指定的Convert
* @param obj 输出对象
*/
@SuppressWarnings("unchecked")
public void finish(final Convert convert, final Object obj) {
finish(convert, (Type) null, obj);
}
/**
* 将结果对象输出
*
* @param convert 指定的Convert
* @param type 指定的类型
* @param obj 输出对象
*/
@SuppressWarnings("unchecked")
public void finish(final Convert convert, final Type type, final Object obj) {
if (obj == null) {
finish("null");
} else if (obj instanceof CompletableFuture) {
((CompletableFuture) obj).whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request + ", result is CompletableFuture", (Throwable) e);
finish(500, null);
return;
}
finish(convert, type, v);
});
} else if (obj instanceof CharSequence) {
finish((String) obj.toString());
} else if (obj instanceof byte[]) {
finish((byte[]) obj);
} else if (obj instanceof ByteBuffer) {
finish((ByteBuffer) obj);
} else if (obj instanceof ByteBuffer[]) {
finish((ByteBuffer[]) obj);
} else if (obj instanceof File) {
try {
finish((File) result.getResult());
finish((File) obj);
} catch (IOException e) {
getContext().getLogger().log(Level.WARNING, "HttpServlet finishJson HttpResult File occur, forece to close channel. request = " + getRequest(), e);
getContext().getLogger().log(Level.WARNING, "HttpServlet finish File occur, forece to close channel. request = " + getRequest(), e);
finish(500, null);
}
} else if (result.getResult() instanceof String) {
finish((String) result.getResult());
} else if (result.getResult() == null) {
finish(result.getMessage());
} else if (obj instanceof HttpResult) {
HttpResult result = (HttpResult) obj;
if (result.getContentType() != null) setContentType(result.getContentType());
addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus());
if (result.getResult() == null) {
finish("");
} else {
finish(convert, result.getResult());
}
} else {
finishJson(result.getResult());
if (convert instanceof TextConvert) this.contentType = "text/plain; charset=utf-8";
if (this.recycleListener != null) this.output = obj;
if (obj instanceof org.redkale.service.RetResult) {
org.redkale.service.RetResult ret = (org.redkale.service.RetResult) obj;
if (!ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode())).addValue("retinfo", ret.getRetinfo());
}
}
ByteBuffer[] buffers = type == null ? convert.convertTo(context.getBufferSupplier(), obj)
: convert.convertTo(context.getBufferSupplier(), type, obj);
finish(buffers);
}
}
@@ -642,7 +654,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param attachment 异步回调参数
* @param handler 异步回调函数
*/
public <A> void sendBody(ByteBuffer buffer, A attachment, AsyncHandler<Integer, A> handler) {
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) {
if (this.contentLength < 0) this.contentLength = buffer == null ? 0 : buffer.remaining();
ByteBuffer headbuf = createHeader();
@@ -665,7 +677,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param attachment 异步回调参数
* @param handler 异步回调函数
*/
public <A> void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler<Integer, A> handler) {
public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) {
if (this.contentLength < 0) {
int len = 0;
@@ -1016,7 +1028,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.bufferHandler = bufferHandler;
}
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> {
protected final class TransferFileHandler implements CompletionHandler<Integer, ByteBuffer> {
private final File file;

View File

@@ -76,6 +76,7 @@ public class MimeType {
contentTypes.put("m3u", "audio/x-mpegurl");
contentTypes.put("mac", "image/x-macpaint");
contentTypes.put("man", "application/x-troff-man");
contentTypes.put("manifest", "text/cache-manifest");
contentTypes.put("mathml", "application/mathml+xml");
contentTypes.put("me", "application/x-troff-me");
contentTypes.put("mid", "audio/x-midi");

View File

@@ -10,6 +10,7 @@ import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.reflect.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource;
@@ -17,6 +18,7 @@ import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import static jdk.internal.org.objectweb.asm.Opcodes.*;
import jdk.internal.org.objectweb.asm.Type;
import org.redkale.convert.Convert;
import org.redkale.convert.json.*;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -141,13 +143,20 @@ public final class Rest {
return childFactory.getConvert();
}
static String getWebModuleName(Class<? extends Service> serviceType) {
static String getWebModuleNameLowerCase(Class<? extends Service> serviceType) {
final RestService controller = serviceType.getAnnotation(RestService.class);
if (controller == null) return serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase();
if (controller.ignore()) return null;
return (!controller.name().isEmpty()) ? controller.name() : serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase();
}
static String getWebModuleName(Class<? extends Service> serviceType) {
final RestService controller = serviceType.getAnnotation(RestService.class);
if (controller == null) return serviceType.getSimpleName().replaceAll("Service.*$", "");
if (controller.ignore()) return null;
return (!controller.name().isEmpty()) ? controller.name() : serviceType.getSimpleName().replaceAll("Service.*$", "");
}
static boolean isRestDyn(HttpServlet servlet) {
return servlet.getClass().getAnnotation(RestDyn.class) != null;
}
@@ -323,11 +332,15 @@ public final class Rest {
mv.visitVarInsn(ALOAD, 0);
pushInt(mv, rws.wsmaxbody());
mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.anyuser() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "anyuser", "Z");
mv.visitInsn(RETURN);
mv.visitMaxs(3, 1);
mv.visitEnd();
@@ -574,7 +587,9 @@ public final class Rest {
final String webServletDesc = Type.getDescriptor(WebServlet.class);
final String reqDesc = Type.getDescriptor(HttpRequest.class);
final String respDesc = Type.getDescriptor(HttpResponse.class);
final String convertDesc = Type.getDescriptor(JsonConvert.class);
final String convertDesc = Type.getDescriptor(Convert.class);
final String typeDesc = Type.getDescriptor(java.lang.reflect.Type.class);
final String jsonConvertDesc = Type.getDescriptor(JsonConvert.class);
final String retDesc = Type.getDescriptor(RetResult.class);
final String futureDesc = Type.getDescriptor(CompletableFuture.class);
final String flipperDesc = Type.getDescriptor(Flipper.class);
@@ -601,7 +616,8 @@ public final class Rest {
String newDynName = serviceTypeInternalName.substring(0, serviceTypeInternalName.lastIndexOf('/') + 1) + "_Dyn" + serviceType.getSimpleName().replaceAll("Service.*$", "") + "RestServlet";
//------------------------------------------------------------------------------
final String defmodulename = getWebModuleName(serviceType);
final String defmodulename = getWebModuleNameLowerCase(serviceType);
final String bigmodulename = getWebModuleName(serviceType);
final String catalog = controller == null ? "" : controller.catalog();
if (!checkName(catalog)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(defmodulename)) throw new RuntimeException(serviceType.getName() + " have illeal " + RestService.class.getSimpleName() + ".value, only 0-9 a-z A-Z _ cannot begin 0-9");
@@ -704,7 +720,7 @@ public final class Rest {
if (ignore) continue;
paramtypes.add(method.getGenericParameterTypes());
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(methodidex, null, defmodulename, method);
MappingEntry entry = new MappingEntry(methodidex, null, bigmodulename, method);
if (entrys.contains(entry)) throw new RuntimeException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat");
entrys.add(entry);
} else {
@@ -717,7 +733,6 @@ public final class Rest {
methodidex++;
}
if (entrys.isEmpty()) return null; //没有可HttpMapping的方法
//将每个Service可转换的方法生成HttpServlet对应的HttpMapping方法
final Map<String, List<String>> asmParamMap = MethodParamClassVisitor.getMethodParamNames(serviceType);
final Map<String, java.lang.reflect.Type> bodyTypes = new HashMap<>();
@@ -864,7 +879,7 @@ public final class Rest {
if (ptype.isPrimitive() || ptype == String.class) n = "#";
}
if (annhead == null && anncookie == null && annsid == null && annaddr == null && annbody == null && annfile == null
&& !ptype.isPrimitive() && ptype != String.class && ptype != Flipper.class && !AsyncHandler.class.isAssignableFrom(ptype)
&& !ptype.isPrimitive() && ptype != String.class && ptype != Flipper.class && !CompletionHandler.class.isAssignableFrom(ptype)
&& !ptype.getName().startsWith("java") && n.charAt(0) != '#' && !"&".equals(n)) { //判断Json对象是否包含@RestUploadFile
Class loop = ptype;
do {
@@ -1001,17 +1016,17 @@ public final class Rest {
paramMap.put("name", pname);
paramMap.put("type", ptype.getName());
if (AsyncHandler.class.isAssignableFrom(ptype)) { //HttpResponse.createAsyncHandler() or HttpResponse.createAsyncHandler(Class)
if (ptype == AsyncHandler.class) {
if (CompletionHandler.class.isAssignableFrom(ptype)) { //HttpResponse.createAsyncHandler() or HttpResponse.createAsyncHandler(Class)
if (ptype == CompletionHandler.class) {
mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "()Lorg/redkale/util/AsyncHandler;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "()Ljava/nio/channels/CompletionHandler;", false);
mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals});
} else {
mv.visitVarInsn(ALOAD, 3);
mv.visitVarInsn(ALOAD, 2);
mv.visitLdcInsn(Type.getType(Type.getDescriptor(ptype)));
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "(Ljava/lang/Class;)Lorg/redkale/util/AsyncHandler;", false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "createAsyncHandler", "(Ljava/lang/Class;)Ljava/nio/channels/CompletionHandler;", false);
mv.visitTypeInsn(CHECKCAST, ptype.getName().replace('.', '/'));
mv.visitVarInsn(ASTORE, maxLocals);
varInsns.add(new int[]{ALOAD, maxLocals});
@@ -1532,34 +1547,6 @@ public final class Rest {
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/io/File;)V", false);
mv.visitInsn(RETURN);
maxLocals++;
} else if (RetResult.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + retDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + retDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else if (HttpResult.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + httprsDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + httprsDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else if (Number.class.isAssignableFrom(returnType) || CharSequence.class.isAssignableFrom(returnType)) { //returnType == String.class 必须放在前面
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
@@ -1568,31 +1555,17 @@ public final class Rest {
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/lang/String;)V", false);
mv.visitInsn(RETURN);
maxLocals++;
} else if (CompletableFuture.class.isAssignableFrom(returnType)) {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2);//response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + futureDesc + ")V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + futureDesc + ")V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
} else {
mv.visitVarInsn(ASTORE, maxLocals);
mv.visitVarInsn(ALOAD, 2); //response
if (rcs != null && rcs.length > 0) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), convertDesc);
mv.visitFieldInsn(GETFIELD, newDynName, REST_JSONCONVERT_FIELD_PREFIX + restConverts.size(), jsonConvertDesc);
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + convertDesc + "Ljava/lang/Object;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(" + convertDesc + "Ljava/lang/Object;)V", false);
} else {
mv.visitVarInsn(ALOAD, maxLocals);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(Ljava/lang/Object;)V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finish", "(Ljava/lang/Object;)V", false);
}
mv.visitInsn(RETURN);
maxLocals++;
@@ -1613,7 +1586,7 @@ public final class Rest {
}
for (int i = 1; i <= restConverts.size(); i++) {
fv = cw.visitField(ACC_PRIVATE, REST_JSONCONVERT_FIELD_PREFIX + i, convertDesc, null, null);
fv = cw.visitField(ACC_PRIVATE, REST_JSONCONVERT_FIELD_PREFIX + i, jsonConvertDesc, null, null);
fv.visitEnd();
}

View File

@@ -52,6 +52,13 @@ public @interface RestWebSocket {
*/
boolean single() default true;
/**
* WebSocket.createUserid返回的值是否不能表示户登录态
*
* @return 默认false
*/
boolean anyuser() default false;
/**
* WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒
*

View File

@@ -64,6 +64,9 @@ public abstract class WebSocket<G extends Serializable, T> {
@Comment("WebSocket已离线")
public static final int RETCODE_WSOFFLINE = 1 << 8; //256
@Comment("WebSocket将延迟发送")
public static final int RETCODE_DEAYSEND = 1 << 9; //512
WebSocketRunner _runner; //不可能为空
WebSocketEngine _engine; //不可能为空
@@ -90,6 +93,8 @@ public abstract class WebSocket<G extends Serializable, T> {
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
List<WebSocketPacket> delayPackets;
protected WebSocket() {
}
@@ -225,6 +230,11 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 0表示成功 非0表示错误码
*/
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._runner == null) {
if (delayPackets == null) delayPackets = new ArrayList<>();
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
}
CompletableFuture<Integer> rs = this._runner.sendMessage(packet);
if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
_engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);
@@ -429,6 +439,18 @@ public abstract class WebSocket<G extends Serializable, T> {
return _engine.node.getRpcNodeWebSocketAddresses(userid);
}
/**
* 更改本WebSocket的userid
*
* @param newuserid 新用户ID不能为null
*
* @return CompletableFuture
*/
public CompletableFuture<Void> changeUserid(final G newuserid) {
if (newuserid == null) throw new NullPointerException("newuserid is null");
return _engine.changeUserid(this, newuserid);
}
/**
* 强制关闭用户的所有WebSocket
*

View File

@@ -10,7 +10,7 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Predicate;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.*;
import org.redkale.convert.Convert;
@@ -149,6 +149,31 @@ public class WebSocketEngine {
}
}
@Comment("更改WebSocket的userid")
CompletableFuture<Void> changeUserid(WebSocket socket, final Serializable newuserid) {
if (newuserid == null) throw new NullPointerException("newuserid is null");
final Serializable olduserid = socket._userid;
socket._userid = newuserid;
if (single) {
websockets.remove(olduserid);
websockets.put(newuserid, socket);
} else { //非线程安全, 在常规场景中无需锁
List<WebSocket> oldlist = websockets2.get(olduserid);
if (oldlist != null) {
oldlist.remove(socket);
if (oldlist.isEmpty()) websockets2.remove(olduserid);
}
List<WebSocket> newlist = websockets2.get(newuserid);
if (newlist == null) {
newlist = new CopyOnWriteArrayList<>();
websockets2.put(newuserid, newlist);
}
newlist.add(socket);
}
if (node != null) return node.changeUserid(olduserid, newuserid);
return CompletableFuture.completedFuture(null);
}
@Comment("强制关闭本地用户的WebSocket")
public int forceCloseLocalWebSocket(Serializable userid) {
if (single) {
@@ -298,6 +323,16 @@ public class WebSocketEngine {
return list;
}
@Comment("获取所有连接")
public void forEachLocalWebSocket(Consumer<WebSocket> consumer) {
if (consumer == null) return;
if (single) {
websockets.values().stream().forEach(consumer);
} else {
websockets2.values().forEach(x -> x.stream().forEach(consumer));
}
}
@Comment("获取当前连接总数")
public int getLocalWebSocketSize() {
if (single) return websockets.size();

View File

@@ -61,7 +61,7 @@ public abstract class WebSocketNode {
protected WebSocketEngine localEngine;
public void init(AnyValue conf) {
if(sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
}
public void destroy(AnyValue conf) {
@@ -86,6 +86,8 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress addr);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress addr);
//--------------------------------------------------------------------------------
@@ -99,6 +101,11 @@ public abstract class WebSocketNode {
return disconnect(userid, localSncpAddress);
}
final CompletableFuture<Void> changeUserid(Serializable olduserid, final Serializable newuserid) {
if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return changeUserid(olduserid, newuserid, localSncpAddress);
}
//--------------------------------------------------------------------------------
/**
* 获取目标地址 <br>
@@ -422,7 +429,7 @@ public abstract class WebSocketNode {
private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid));
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + JsonConvert.root().convertTo(message) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
@@ -435,7 +442,7 @@ public abstract class WebSocketNode {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null;
@@ -445,7 +452,7 @@ public abstract class WebSocketNode {
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}

View File

@@ -433,6 +433,10 @@ class WebSocketRunner implements Runnable {
return futureResult;
}
public boolean isClosed() {
return closed;
}
public void closeRunner(int code) {
if (closed) return;
synchronized (this) {

View File

@@ -9,6 +9,7 @@ import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.security.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -62,13 +63,20 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected Type messageTextType; //RestWebSocket时会被修改
//同RestWebSocket.single
protected boolean single = true; //是否单用户单连接
//同RestWebSocket.liveinterval
protected int liveinterval = DEFAILT_LIVEINTERVAL;
//同RestWebSocket.wsmaxconns
protected int wsmaxconns = 0;
protected int wsmaxbody = 0;
//同RestWebSocket.wsmaxbody
protected int wsmaxbody = 16 * 1024;
//同RestWebSocket.anyuser
protected boolean anyuser = false;
@Resource(name = "jsonconvert")
protected Convert jsonConvert;
@@ -170,7 +178,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return;
}
sessionFuture.whenComplete((sessionid, ex) -> {
if (sessionid == null || ex != null) {
if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
response.finish(true);
return;
@@ -185,41 +193,95 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
response.sendBody((ByteBuffer) null, null, new AsyncHandler<Integer, Void>() {
response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() {
WebSocketRunner temprunner = null;
@Override
public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext();
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
response.finish(true);
return;
}
userFuture.whenComplete((userid, ex2) -> {
if (userid == null || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
response.finish(true);
return;
}
webSocket._userid = userid;
if (single) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {
if (rs) webSocket.onSingleRepeatConnect();
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
userFuture.whenComplete((userid, ex2) -> {
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true);
});
} else {
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
return;
}
Runnable runHandler = () -> {
temprunner = null;
webSocket._userid = userid;
if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {
if (rs) webSocket.onSingleRepeatConnect();
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
});
} else {
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((Integer v, Throwable t) -> {
if (userid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
} else {
runHandler.run();
}
});
} else {
runHandler.run();
}
});
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
});
cf.whenComplete((Integer v, Throwable t) -> {
if (sessionid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
} else {
createUseridHandler.run();
}
});
} else {
createUseridHandler.run();
}
}
@Override

View File

@@ -9,6 +9,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.security.*;
import java.util.*;
import javax.annotation.Resource;
@@ -138,12 +139,12 @@ public abstract class Sncp {
}
static void checkAsyncModifier(Class param, Method method) {
if (param == AsyncHandler.class) return;
if (param == CompletionHandler.class) return;
if (Modifier.isFinal(param.getModifiers())) {
throw new RuntimeException("AsyncHandler Type Parameter on {" + method + "} cannot final modifier");
throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} cannot final modifier");
}
if (!Modifier.isPublic(param.getModifiers())) {
throw new RuntimeException("AsyncHandler Type Parameter on {" + method + "} must be public modifier");
throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} must be public modifier");
}
if (param.isInterface()) return;
boolean constructorflag = false;
@@ -388,8 +389,8 @@ public abstract class Sncp {
int varindex = 0;
boolean handlerFuncFlag = false;
for (Class pt : paramtypes) {
if (AsyncHandler.class.isAssignableFrom(pt)) {
if (handlerFuncFlag) throw new RuntimeException(method + " have more than one AsyncHandler type parameter");
if (CompletionHandler.class.isAssignableFrom(pt)) {
if (handlerFuncFlag) throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
checkAsyncModifier(pt, method);
handlerFuncFlag = true;
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net.sncp;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import jdk.internal.org.objectweb.asm.*;
@@ -26,7 +27,7 @@ import org.redkale.util.*;
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
*/
public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> {
public interface SncpAsyncHandler<V, A> extends CompletionHandler<V, A> {
public Object[] sncp_getParams();
@@ -42,9 +43,9 @@ public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> {
* <blockquote><pre>
*
* 考虑点:
* 1、AsyncHandler子类是接口且还有其他多个方法
* 2、AsyncHandler子类是类 需要继承,且必须有空参数构造函数
* 3、AsyncHandler子类无论是接口还是类都可能存在其他泛型
* 1、CompletionHandler子类是接口且还有其他多个方法
* 2、CompletionHandler子类是类 需要继承,且必须有空参数构造函数
* 3、CompletionHandler子类无论是接口还是类都可能存在其他泛型
*
* public class XXXAsyncHandler_DyncSncpAsyncHandler_4323 extends XXXAsyncHandler implements SncpAsyncHandler {
*
@@ -91,11 +92,11 @@ public interface SncpAsyncHandler<V, A> extends AsyncHandler<V, A> {
*
* </pre></blockquote>
*
* @param handlerClass AsyncHandler类型或子类
* @param handlerClass CompletionHandler类型或子类
*
* @return Creator
*/
public static Creator<SncpAsyncHandler> createCreator(Class<? extends AsyncHandler> handlerClass) {
public static Creator<SncpAsyncHandler> createCreator(Class<? extends CompletionHandler> handlerClass) {
//-------------------------------------------------------------
final boolean handlerinterface = handlerClass.isInterface();
final String handlerClassName = handlerClass.getName().replace('.', '/');

View File

@@ -103,6 +103,7 @@ public final class SncpClient {
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP clientAddress only support IPv4");
}
static List<SncpAction> getSncpActions(final Class serviceClass) {
@@ -292,7 +293,7 @@ public final class SncpClient {
//只给远程模式调用的
public <T> T remote(final int index, final Object... params) {
final SncpAction action = actions[index];
final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null;
final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null;
final BsonReader reader = bsonConvert.pollBsonReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
@@ -338,15 +339,15 @@ public final class SncpClient {
}
}
private CompletableFuture<byte[]> remote0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
private CompletableFuture<byte[]> remote0(final CompletionHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
if(bsonConvert == null) bsonConvert = BsonConvert.root();
if (bsonConvert == null) bsonConvert = BsonConvert.root();
final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]);
bsonConvert.convertTo(writer, CompletionHandler.class.isAssignableFrom(myparamclass[i]) ? CompletionHandler.class : myparamtypes[i], params[i]);
}
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
@@ -570,12 +571,12 @@ public final class SncpClient {
if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {
if (AsyncHandler.class.isAssignableFrom(params[i])) {
if (CompletionHandler.class.isAssignableFrom(params[i])) {
if (boolReturnTypeFuture) {
throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture");
throw new RuntimeException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new RuntimeException(method + " have more than one AsyncHandler type parameter");
throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(params[i], method);
handlerFuncIndex = i;
@@ -616,7 +617,7 @@ public final class SncpClient {
this.handlerAttachParamIndex = handlerAttachIndex;
this.paramAttrs = hasattr ? atts : null;
if (this.handlerFuncParamIndex >= 0 && method.getReturnType() != void.class) {
throw new RuntimeException(method + " have AsyncHandler type parameter but return type is not void");
throw new RuntimeException(method + " have CompletionHandler type parameter but return type is not void");
}
}

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.lang.annotation.*;
import java.lang.reflect.*;
import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.*;
@@ -118,7 +119,7 @@ public final class SncpDynServlet extends SncpServlet {
SncpAsyncHandler handler = null;
try {
if (action.handlerFuncParamIndex >= 0) {
if (action.handlerFuncParamClass == AsyncHandler.class) {
if (action.handlerFuncParamClass == CompletionHandler.class) {
handler = new DefaultSncpAsyncHandler(action, in, out, request, response);
} else {
Creator<SncpAsyncHandler> creator = action.handlerCreator;
@@ -178,15 +179,15 @@ public final class SncpDynServlet extends SncpServlet {
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null
protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数
protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在CompletionHandler参数
protected boolean boolReturnTypeFuture = false; // 返回结果类型是否为 CompletableFuture
protected Class handlerFuncParamClass; //AsyncHandler参数的类型
protected Class handlerFuncParamClass; //CompletionHandler参数的类型
public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable;
//只有同步方法才调用 (没有AsyncHandler、CompletableFuture)
//只有同步方法才调用 (没有CompletionHandler、CompletableFuture)
public final void _callParameter(final BsonWriter out, final Object... params) {
if (paramAttrs != null) {
for (int i = 1; i < paramAttrs.length; i++) {
@@ -207,10 +208,10 @@ public final class SncpDynServlet extends SncpServlet {
* return false;
* }
*
* public void insert(AsyncHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* public void insert(CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* }
*
* public void update(long show, short v2, AsyncHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* public void update(long show, short v2, CompletionHandler&#60;Boolean, TestBean&#62; handler, TestBean bean, String name, int id) {
* }
*
* public CompletableFuture&#60;String&#62; changeName(TestBean bean, String name, int id) {
@@ -241,7 +242,7 @@ public final class SncpDynServlet extends SncpServlet {
* &#064;Override
* public void action(BsonReader in, BsonWriter out, SncpAsyncHandler handler) throws Throwable {
* SncpAsyncHandler arg0 = handler;
* convert.convertFrom(AsyncHandler.class, in);
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[2], in);
* String arg2 = convert.convertFrom(paramTypes[3], in);
* int arg3 = convert.convertFrom(paramTypes[4], in);
@@ -259,7 +260,7 @@ public final class SncpDynServlet extends SncpServlet {
* long a1 = convert.convertFrom(paramTypes[1], in);
* short a2 = convert.convertFrom(paramTypes[2], in);
* SncpAsyncHandler a3 = handler;
* convert.convertFrom(AsyncHandler.class, in);
* convert.convertFrom(CompletionHandler.class, in);
* TestBean arg1 = convert.convertFrom(paramTypes[4], in);
* String arg2 = convert.convertFrom(paramTypes[5], in);
* int arg3 = convert.convertFrom(paramTypes[6], in);
@@ -353,12 +354,12 @@ public final class SncpDynServlet extends SncpServlet {
final Class[] paramClasses = method.getParameterTypes();
int[][] codes = new int[paramClasses.length][2];
for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数
if (AsyncHandler.class.isAssignableFrom(paramClasses[i])) {
if (CompletionHandler.class.isAssignableFrom(paramClasses[i])) {
if (boolReturnTypeFuture) {
throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture");
throw new RuntimeException(method + " have both CompletionHandler and CompletableFuture");
}
if (handlerFuncIndex >= 0) {
throw new RuntimeException(method + " have more than one AsyncHandler type parameter");
throw new RuntimeException(method + " have more than one CompletionHandler type parameter");
}
Sncp.checkAsyncModifier(paramClasses[i], method);
handlerFuncIndex = i;
@@ -372,7 +373,7 @@ public final class SncpDynServlet extends SncpServlet {
intconst++;
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
mv.visitLdcInsn(Type.getType(Type.getDescriptor(AsyncHandler.class)));
mv.visitLdcInsn(Type.getType(Type.getDescriptor(CompletionHandler.class)));
mv.visitVarInsn(ALOAD, 1);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false);
mv.visitInsn(POP);

View File

@@ -49,6 +49,7 @@ public final class SncpResponse extends Response<SncpContext, SncpRequest> {
super(context, request);
this.addrBytes = context.getServerAddress().getAddress().getAddress();
this.addrPort = context.getServerAddress().getPort();
if (this.addrBytes.length != 4) throw new RuntimeException("SNCP serverAddress only support IPv4");
}
public void finish(final int retcode, final BsonWriter out) {

View File

@@ -124,7 +124,8 @@ public class RetResult<T> {
*/
public RetResult<T> attach(String key, Object value) {
if (this.attach == null) this.attach = new HashMap<>();
this.attach.put(key, value == null ? null : String.valueOf(value));
boolean canstr = value != null && (value instanceof CharSequence || value.getClass().isPrimitive());
this.attach.put(key, value == null ? null : (canstr ? String.valueOf(value) : JsonConvert.root().convertTo(value)));
return this;
}

View File

@@ -22,10 +22,10 @@ import org.redkale.util.*;
* <blockquote><pre>
* 异步方法:
* Service编写异步方法
* 1、异步方法有且仅有一个类型为AsyncHandler的参数 返回类型必须是void。若参数类型为AsyncHandler子类必须保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
* 1、异步方法有且仅有一个类型为CompletionHandler的参数 返回类型必须是void。若参数类型为CompletionHandler子类必须保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数。
* 2、异步方法返回类型是CompletableFuture。
* 例如:
* public void insertRecord(AsyncHandler&#60;Integer, Record&#62; handler, String name, &#64;RpcAttachment Record record);
* public void insertRecord(CompletionHandler&#60;Integer, Record&#62; handler, String name, &#64;RpcAttachment Record record);
*
* </pre></blockquote>
*

View File

@@ -61,7 +61,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
/**
* 当用户连接到节点需要更新到CacheSource
*
* @param userid String
* @param userid Serializable
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
@@ -78,7 +78,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
/**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
*
* @param userid String
* @param userid Serializable
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
@@ -91,10 +91,27 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return future;
}
/**
* 更改用户ID需要更新到CacheSource
*
* @param olduserid Serializable
* @param newuserid Serializable
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr));
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
return future;
}
/**
* 强制关闭用户的WebSocket
*
* @param userid String
* @param userid Serializable
* @param sncpAddr InetSocketAddress
*
* @return 无返回值

View File

@@ -168,7 +168,8 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
if (remoteSource != null && !Sncp.isRemote(this)) {
SncpClient client = Sncp.getSncpClient((Service) remoteSource);
if (client != null && client.getRemoteGroupTransport() != null) {
if (client != null && client.getRemoteGroupTransport() != null
&& client.getRemoteGroupTransport().getRemoteAddresses().length > 0) {
super.runAsync(() -> {
try {
CompletableFuture<List<CacheEntry<Object>>> listFuture = remoteSource.queryListAsync();
@@ -188,64 +189,63 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
}
}
}
/**
public static void main(String[] args) throws Exception {
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379"));
CacheMemorySource source = new CacheMemorySource();
source.defaultConvert = JsonFactory.root().getConvert();
source.initValueType(String.class); //value用String类型
source.initTransient(false);
source.init(conf);
System.out.println("------------------------------------");
source.remove("key1");
source.remove("key2");
source.remove("300");
source.set("key1", "value1");
source.setString("keystr1", "strvalue1");
source.setLong("keylong1", 333L);
source.set("300", "4000");
source.getAndRefresh("key1", 3500);
System.out.println("[有值] 300 GET : " + source.get("300"));
System.out.println("[有值] key1 GET : " + source.get("key1"));
System.out.println("[无值] key2 GET : " + source.get("key2"));
System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
System.out.println("[有值] key1 EXISTS : " + source.exists("key1"));
System.out.println("[无值] key2 EXISTS : " + source.exists("key2"));
source.remove("keys3");
source.appendListItem("keys3", "vals1");
source.appendListItem("keys3", "vals2");
System.out.println("-------- keys3 追加了两个值 --------");
System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3"));
System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3"));
source.removeListItem("keys3", "vals1");
System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3"));
source.getCollectionAndRefresh("keys3", 3000);
source.remove("sets3");
source.appendSetItem("sets3", "setvals1");
source.appendSetItem("sets3", "setvals2");
source.appendSetItem("sets3", "setvals1");
System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3"));
System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3"));
source.removeSetItem("sets3", "setvals1");
System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3"));
System.out.println("sets3 大小 : " + source.getCollectionSize("sets3"));
System.out.println("all keys: " + source.queryKeys());
System.out.println("newnum 值 : " + source.incr("newnum"));
System.out.println("newnum 值 : " + source.decr("newnum"));
System.out.println("------------------------------------");
source.destroy(null);
source.init(null);
System.out.println("all keys: " + source.queryKeys());
System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
}
*/
* public static void main(String[] args) throws Exception {
* AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
* conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6379"));
*
* CacheMemorySource source = new CacheMemorySource();
* source.defaultConvert = JsonFactory.root().getConvert();
* source.initValueType(String.class); //value用String类型
* source.initTransient(false);
* source.init(conf);
*
* System.out.println("------------------------------------");
* source.remove("key1");
* source.remove("key2");
* source.remove("300");
* source.set("key1", "value1");
* source.setString("keystr1", "strvalue1");
* source.setLong("keylong1", 333L);
* source.set("300", "4000");
* source.getAndRefresh("key1", 3500);
* System.out.println("[有值] 300 GET : " + source.get("300"));
* System.out.println("[有值] key1 GET : " + source.get("key1"));
* System.out.println("[无值] key2 GET : " + source.get("key2"));
* System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
* System.out.println("[有值] key1 EXISTS : " + source.exists("key1"));
* System.out.println("[无值] key2 EXISTS : " + source.exists("key2"));
*
* source.remove("keys3");
* source.appendListItem("keys3", "vals1");
* source.appendListItem("keys3", "vals2");
* System.out.println("-------- keys3 追加了两个值 --------");
* System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3"));
* System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3"));
* source.removeListItem("keys3", "vals1");
* System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3"));
* source.getCollectionAndRefresh("keys3", 3000);
*
* source.remove("sets3");
* source.appendSetItem("sets3", "setvals1");
* source.appendSetItem("sets3", "setvals2");
* source.appendSetItem("sets3", "setvals1");
* System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3"));
* System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3"));
* source.removeSetItem("sets3", "setvals1");
* System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3"));
* System.out.println("sets3 大小 : " + source.getCollectionSize("sets3"));
* System.out.println("all keys: " + source.queryKeys());
* System.out.println("newnum 值 : " + source.incr("newnum"));
* System.out.println("newnum 值 : " + source.decr("newnum"));
* System.out.println("------------------------------------");
* source.destroy(null);
* source.init(null);
* System.out.println("all keys: " + source.queryKeys());
* System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L));
* }
*/
@Override
public void close() throws Exception { //给Application 关闭时调用
destroy(null);

View File

@@ -36,15 +36,15 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
protected final String name;
protected String name;
protected final URL conf;
protected URL conf;
protected final boolean cacheForbidden;
protected boolean cacheForbidden;
protected final PoolJdbcSource readPool;
protected PoolJdbcSource readPool;
protected final PoolJdbcSource writePool;
protected PoolJdbcSource writePool;
@Resource(name = "$")
protected DataCacheListener cacheListener;
@@ -52,6 +52,44 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
this.preConstruct(unitName, readprop, writeprop);
this.initByProperties(unitName, readprop, writeprop);
}
public DataJdbcSource() {
}
@Override
public void init(AnyValue config) { //通过空构造函数创建的对象需要调用init方法进行初始化
String unitName = config.getValue("name");
Properties readprop = new Properties();
Properties writeprop = new Properties();
for (AnyValue confs : config.getAnyValues("properties")) {
boolean write = confs.getValue("name", "").contains("write");
for (AnyValue conf : confs.getAnyValues("property")) {
String pn = conf.getValue("name");
String pv = conf.getValue("value");
if (pn == null || pv == null) continue;
(write ? writeprop : readprop).put(pn, pv);
}
}
for (AnyValue conf : config.getAnyValues("property")) {
String pn = conf.getValue("name");
String pv = conf.getValue("value");
if (pn == null || pv == null) continue;
readprop.put(pn, pv);
}
if (writeprop.isEmpty()) writeprop = readprop;
this.initByProperties(unitName, readprop, writeprop);
}
//构造前调用
protected void preConstruct(String unitName, Properties readprop, Properties writeprop) {
}
protected void initByProperties(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
this.conf = null;
this.readPool = new PoolJdbcSource(this, "read", readprop);
@@ -59,6 +97,12 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
}
@Local
@Override
public String getType() {
return "jdbc";
}
@Override
public final String resourceName() {
return name;
@@ -70,6 +114,14 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
writePool.close();
}
public PoolJdbcSource getReadPoolJdbcSource() {
return readPool;
}
public PoolJdbcSource getWritePoolJdbcSource() {
return writePool;
}
public Connection createReadSQLConnection() {
return readPool.poll();
}
@@ -1443,7 +1495,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Connection conn = createReadSQLConnection();
try {
final SelectColumn sels = selects;
final String sql = "SELECT * FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
final String sql = "SELECT " + info.getQueryColumns(null, selects) + " FROM " + info.getTable(pk) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(pk);
if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql);
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
@@ -1520,7 +1572,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT a.* FROM " + info.getTable(node) + " a" + (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
if (info.isLoggable(logger, Level.FINEST)) logger.finest(clazz.getSimpleName() + " find sql=" + sql);
conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
@@ -2246,7 +2298,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
final String sql = "SELECT a.* FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + info.createSQLOrderby(flipper);
if (info.isLoggable(logger, Level.FINEST)) {
logger.finest(clazz.getSimpleName() + " query sql=" + sql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getOffset() + "," + flipper.getLimit())));
@@ -2341,7 +2393,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, DataC
final Statement statement = conn.createStatement();
//final PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
final ResultSet set = statement.executeQuery(sql);// ps.executeQuery();
consumer.accept(set);
consumer.accept(set);
set.close();
statement.close();
} catch (Exception ex) {

View File

@@ -24,6 +24,13 @@ import org.redkale.util.*;
@SuppressWarnings("unchecked")
public interface DataSource {
/**
* 获取数据源类型
*
* @return String
*/
public String getType();
//----------------------insertAsync-----------------------------
/**
* 新增记录, 多对象必须是同一个Entity类 <br>

View File

@@ -92,6 +92,7 @@ public final class DataSources {
}
}
if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource");
if (writeprop == null) writeprop = readprop;
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName());
if (DataJdbcSource.class.getName().equals(impl)) return new DataJdbcSource(unitName, readprop, writeprop);
try {

View File

@@ -286,7 +286,7 @@ public final class EntityCache<T> {
rs = rs2;
} else if (func == DISTINCTCOUNT) {
Map rs2 = new LinkedHashMap();
rs.forEach((x, y) -> rs2.put(x, ((Set) y).size()));
rs.forEach((x, y) -> rs2.put(x, ((Set) y).size() + 0L));
rs = rs2;
}
return rs;

View File

@@ -434,6 +434,26 @@ public final class EntityInfo<T> {
return deleteSQL.replace("${newtable}", getTable(bean));
}
/**
* 获取查询字段列表
*
* @param tabalis 表别名
* @param selects 过滤字段
*
* @return String
*/
public CharSequence getQueryColumns(String tabalis, SelectColumn selects) {
if (selects == null) return tabalis == null ? "*" : (tabalis + ".*");
StringBuilder sb = new StringBuilder();
for (Attribute attr : this.attributes) {
if (!selects.test(attr.field())) continue;
if (sb.length() > 0) sb.append(',');
sb.append(getSQLColumn(tabalis, attr.field()));
}
if (sb.length() == 0) sb.append('*');
return sb;
}
/**
* 根据主键值获取Entity的表名
*

View File

@@ -16,10 +16,12 @@ import java.util.function.*;
* <p>
* 详情见: https://redkale.org
*
* @deprecated 使用 java.nio.channels.CompletionHandler 代替
* @author zhangjx
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
*/
@Deprecated
public interface AsyncHandler<V, A> extends CompletionHandler<V, A> {
/**

View File

@@ -19,7 +19,7 @@ import java.util.logging.*;
* @author zhangjx
* @param <T> 对象池元素的数据类型
*/
public final class ObjectPool<T> implements Supplier<T> {
public final class ObjectPool<T> implements Supplier<T>, Consumer<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
@@ -78,21 +78,27 @@ public final class ObjectPool<T> implements Supplier<T> {
return result;
}
public void offer(final T e) {
@Override
public void accept(final T e) {
if (e != null && recycler.test(e)) {
if (cycleCounter != null) cycleCounter.incrementAndGet();
if (debug) {
for (T t : queue) {
if (t == e) {
logger.log(Level.WARNING, "[" + Thread.currentThread().getName() + "] repeat offer the same object(" + e + ")", new Exception());
return;
}
}
}
// if (debug) {
// for (T t : queue) {
// if (t == e) {
// logger.log(Level.WARNING, "[" + Thread.currentThread().getName() + "] repeat offer the same object(" + e + ")", new Exception());
// return;
// }
// }
// }
queue.offer(e);
}
}
@Deprecated
public void offer(final T e) {
accept(e);
}
public long getCreatCount() {
return creatCounter.longValue();
}

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.8.6";
return "1.8.8";
}
public static int getMajorVersion() {

View File

@@ -8,10 +8,11 @@ import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.*;
import java.time.*;
import java.util.*;
import java.util.function.Predicate;
import java.util.function.*;
import java.util.zip.GZIPInputStream;
import javax.net.ssl.*;
@@ -229,6 +230,191 @@ public final class Utility {
return news;
}
/**
* 获取int数组之和
*
* @param array 数组
*
* @return int
*/
public static int sum(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int sum = 0;
for (int i : array) {
sum += i;
}
return sum;
}
/**
* 获取long数组之和
*
* @param array 数组
*
* @return long
*/
public static long sum(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long sum = 0L;
for (long i : array) {
sum += i;
}
return sum;
}
/**
* 获取int数组最大值
*
* @param array 数组
*
* @return int
*/
public static int max(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int max = array[0];
for (int i : array) {
if (i > max) i = max;
}
return max;
}
/**
* 获取long数组最大值
*
* @param array 数组
*
* @return long
*/
public static long max(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long max = array[0];
for (long i : array) {
if (i > max) i = max;
}
return max;
}
/**
* 获取int数组最小值
*
* @param array 数组
*
* @return int
*/
public static long min(final int... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
int min = array[0];
for (int i : array) {
if (i < min) i = min;
}
return min;
}
/**
* 获取long数组最小值
*
* @param array 数组
*
* @return long
*/
public static long min(final long... array) {
if (array == null || array.length == 0) throw new NullPointerException("array is null or empty");
long min = array[0];
for (long i : array) {
if (i < min) i = min;
}
return min;
}
/**
* 将int数组用分隔符拼接成字符串
*
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static String joining(final int[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (int i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将long数组用分隔符拼接成字符串
*
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static String joining(final long[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (long i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将对象数组用分隔符拼接成字符串
*
* @param <T> 泛型
* @param array 数组
* @param delimiter 分隔符
*
* @return String
*/
public static <T> String joining(final T[] array, final String delimiter) {
if (array == null || array.length == 0) return "";
StringBuilder sb = new StringBuilder();
for (T i : array) {
if (sb.length() > 0) sb.append(delimiter);
sb.append(i);
}
return sb.toString();
}
/**
* 将一个或多个int新元素添加到int数组结尾
*
* @param array 原数组
* @param objs 待追加数据
*
* @return 新数组
*/
public static int[] append(final int[] array, final int... objs) {
if (array == null || array.length == 0) return objs;
if (objs == null || objs.length == 0) return array;
final int[] news = new int[array.length + objs.length];
System.arraycopy(array, 0, news, 0, array.length);
System.arraycopy(objs, 0, news, array.length, objs.length);
return news;
}
/**
* 将一个或多个long新元素添加到long数组结尾
*
* @param array 原数组
* @param objs 待追加数据
*
* @return 新数组
*/
public static long[] append(final long[] array, final long... objs) {
if (array == null || array.length == 0) return objs;
if (objs == null || objs.length == 0) return array;
final long[] news = new long[array.length + objs.length];
System.arraycopy(array, 0, news, 0, array.length);
System.arraycopy(objs, 0, news, array.length, objs.length);
return news;
}
/**
* 将一个或多个新元素添加到数组结尾
*
@@ -458,6 +644,76 @@ public final class Utility {
return back;
}
/**
* 创建 CompletionHandler 对象
*
* @param <V> 结果对象的泛型
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <V, A> CompletionHandler<V, A> createAsyncHandler(final BiConsumer<V, A> success, final BiConsumer<Throwable, A> fail) {
return new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
if (success != null) success.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有返回结果的 CompletionHandler 对象
*
* @param <A> 附件对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <A> CompletionHandler<Void, A> createAsyncHandler(final Consumer<A> success, final BiConsumer<Throwable, A> fail) {
return new CompletionHandler<Void, A>() {
@Override
public void completed(Void result, A attachment) {
if (success != null) success.accept(attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
if (fail != null) fail.accept(exc, attachment);
}
};
}
/**
* 创建没有附件对象的 CompletionHandler 对象
*
* @param <V> 结果对象的泛型
* @param success 成功的回调函数
* @param fail 失败的回调函数
*
* @return CompletionHandler
*/
public static <V> CompletionHandler<V, Void> createAsyncHandler(final Consumer<V> success, final Consumer<Throwable> fail) {
return new CompletionHandler<V, Void>() {
@Override
public void completed(V result, Void attachment) {
if (success != null) success.accept(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
if (fail != null) fail.accept(exc);
}
};
}
/**
* 获取格式为yyyy-MM-dd HH:mm:ss的当前时间
*

View File

@@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.asm;
import java.io.*;
import org.redkale.util.Utility;
/**
*
* @author zhangjx
*/
public class AsmCreator {
public static void main(String[] args) throws Throwable {
boolean realasm = true; //从http://forge.ow2.org/projects/asm/ 下载最新asm的src放在 srcasmroot 目录下
File srcasmroot = new File("D:/JAVA/JDK源码/JDK9源码/java.base/jdk/internal/org/objectweb/asm");
if(realasm) srcasmroot = new File("D:/JAVA/JDK源码/org/objectweb/asm");
File destasmroot = new File("D:/Java-Projects/RedkaleProject/src/org/redkale/asm");
String line = null;
LineNumberReader txtin = new LineNumberReader(new FileReader(new File(destasmroot, "asm.txt")));
while ((line = txtin.readLine()) != null) {
line = line.trim();
if (!line.endsWith(".java")) continue;
File srcfile = new File(srcasmroot, line);
File destfile = new File(destasmroot, line);
String content = Utility.readThenClose(new FileInputStream(srcfile));
FileOutputStream out = new FileOutputStream(destfile);
out.write(content.replace("jdk.internal.org.objectweb", "org.redkale").replace("org.objectweb", "org.redkale").getBytes());
out.close();
}
}
}

View File

@@ -13,6 +13,7 @@ import org.redkale.convert.bson.BsonConvert;
import java.nio.*;
import java.util.*;
import org.redkale.convert.json.*;
import org.redkale.util.*;
/**
*
@@ -31,6 +32,7 @@ public class BsonTestMain {
main3(args);
main4(args);
main5(args);
main6(args);
}
public static void main2(String[] args) throws Exception {
@@ -98,4 +100,20 @@ public class BsonTestMain {
Object mapobj = convert.convertFrom(Object.class, bs);
System.out.println(mapobj);
}
public static void main6(String[] args) throws Exception {
final BsonConvert convert = BsonFactory.root().getConvert();
Optional<String> val = Optional.ofNullable("haha");
byte[] bs = convert.convertTo(val);
Object obj = convert.convertFrom(Optional.class, bs);
System.out.println(obj);
bs = convert.convertTo(Object.class, val);
obj = convert.convertFrom(Object.class, bs);
System.out.println(obj);
bs = convert.convertTo(new TypeToken<Optional<String>>(){}.getType(), val);
obj = convert.convertFrom(new TypeToken<Optional<String>>(){}.getType(), bs);
System.out.println(obj);
System.out.println(JsonConvert.root().convertTo(val));
}
}

View File

@@ -9,12 +9,12 @@ import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.redkale.convert.json.*;
import org.redkale.net.http.*;
import org.redkale.util.AsyncHandler;
/**
*
@@ -28,11 +28,11 @@ public interface HttpResponseDesc {
//增加Cookie值
public HttpResponse addCookie(Collection<HttpCookie> cookies);
//创建AsyncHandler实例将非字符串对象以JSON格式输出字符串以文本输出
public AsyncHandler createAsyncHandler();
//创建CompletionHandler实例将非字符串对象以JSON格式输出字符串以文本输出
public CompletionHandler createAsyncHandler();
//传入的AsyncHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass);
//传入的CompletionHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数
public <H extends CompletionHandler> H createAsyncHandler(Class<H> handlerClass);
//设置状态码
public void setStatus(int status);
@@ -66,10 +66,10 @@ public interface HttpResponseDesc {
public HttpResponse skipHeader();
//异步输出指定内容
public <A> void sendBody(ByteBuffer buffer, A attachment, AsyncHandler<Integer, A> handler);
public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler);
//异步输出指定内容
public <A> void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler<Integer, A> handler);
public <A> void sendBody(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler);
//关闭HTTP连接如果是keep-alive则不强制关闭
public void finish();

View File

@@ -5,13 +5,13 @@
*/
package org.redkale.test.rest;
import org.redkale.util.AsyncHandler;
import java.nio.channels.CompletionHandler;
/**
*
* @author zhangjx
*/
public class HelloAsyncHandler implements AsyncHandler {
public class HelloAsyncHandler implements CompletionHandler {
@Override
public void completed(Object result, Object attachment) {

View File

@@ -1,5 +1,6 @@
package org.redkale.test.rest;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource;
@@ -42,6 +43,11 @@ public class HelloService implements Service {
return new RetResult<>(entity);
}
//
public HttpResult showHello(int id) {
return new HttpResult("a");
}
//删除记录
public void deleteHello(int id) { //通过 /pipes/hello/delete/1234 删除对象
source.delete(HelloEntity.class, id);
@@ -68,6 +74,7 @@ public class HelloService implements Service {
//查询List列表
@RestMapping(name = "list")
@RestConvert(type = HelloEntity.class, ignoreColumns = {"createtime"})
public List<HelloEntity> queryHello(HelloBean bean) { //通过 /pipes/hello/list?bean={...} 查询List列表
return source.queryList(HelloEntity.class, bean);
}
@@ -96,7 +103,7 @@ public class HelloService implements Service {
//异步查询单个
@RestMapping(name = "asyncfind2")
public void asyncFindHello(AsyncHandler hander, @RestParam(name = "#") int id) { //通过 /pipes/hello/find/1234、/pipes/hello/jsfind/1234 查询对象
public void asyncFindHello(CompletionHandler hander, @RestParam(name = "#") int id) { //通过 /pipes/hello/find/1234、/pipes/hello/jsfind/1234 查询对象
if (source != null) source.findAsync(HelloEntity.class, id);
System.out.println("-----------进入asyncfind2--------" + hander);
hander.completed(new HelloEntity(id), id);

View File

@@ -8,7 +8,7 @@ package org.redkale.test.service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
@@ -165,8 +165,8 @@ public class ABMainService implements Service {
}
@RestMapping(name = "asyncabtime")
public void abCurrentTime(final AsyncHandler<String, Void> handler, @RestParam(name = "#") final String name) {
bcService.bcCurrentTime(AsyncHandler.create((v, a) -> {
public void abCurrentTime(final CompletionHandler<String, Void> handler, @RestParam(name = "#") final String name) {
bcService.bcCurrentTime(Utility.createAsyncHandler((v, a) -> {
System.out.println("执行了 ABMainService.abCurrentTime----异步方法");
String rs = "异步abCurrentTime: " + v;
if (handler != null) handler.completed(rs, a);

View File

@@ -5,9 +5,10 @@
*/
package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import javax.annotation.Resource;
import org.redkale.service.*;
import org.redkale.util.AsyncHandler;
import org.redkale.util.*;
/**
*
@@ -24,8 +25,8 @@ public class BCService implements Service {
return rs;
}
public void bcCurrentTime(final AsyncHandler<String, Void> handler, final String name) {
cService.ccCurrentTime(AsyncHandler.create((v, a) -> {
public void bcCurrentTime(final CompletionHandler<String, Void> handler, final String name) {
cService.ccCurrentTime(Utility.createAsyncHandler((v, a) -> {
System.out.println("执行了 BCService.bcCurrentTime----异步方法");
String rs = "异步bcCurrentTime: " + (v == null ? null : v.getResult());
if (handler != null) handler.completed(rs, null);

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -20,7 +21,7 @@ public class CService implements Service {
return new RetResult(rs);
}
public void ccCurrentTime(final AsyncHandler<RetResult<String>, Void> handler, final String name) {
public void ccCurrentTime(final CompletionHandler<RetResult<String>, Void> handler, final String name) {
String rs = "异步ccCurrentTime: " + name + ": " + Utility.formatTime(System.currentTimeMillis());
System.out.println("执行了 CService.ccCurrentTime----异步方法");
if (handler != null) handler.completed(new RetResult(rs), null);

View File

@@ -5,13 +5,13 @@
*/
package org.redkale.test.service;
import org.redkale.util.AsyncHandler;
import java.nio.channels.CompletionHandler;
/**
*
* @author zhangjx
*/
public abstract class MyAsyncInnerHandler<V, A> implements AsyncHandler<V, A> {
public abstract class MyAsyncInnerHandler<V, A> implements CompletionHandler<V, A> {
protected abstract int id2();

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.test.service;
import java.nio.channels.CompletionHandler;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
import org.redkale.util.*;
@@ -19,7 +20,7 @@ public class TestService implements Service {
// return false;
// }
public void change(AsyncHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
public void change(CompletionHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) {
}

View File

@@ -7,6 +7,7 @@ package org.redkale.test.sncp;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.*;
import org.redkale.net.TransportFactory;
import org.redkale.net.sncp.*;
@@ -97,7 +98,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
return "result: " + bean;
}
public void queryResult(AsyncHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {
public void queryResult(CompletionHandler<String, SncpTestBean> handler, @RpcAttachment SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " handler 运行了queryResult方法");
if (handler != null) handler.completed("result: " + bean, bean);
}

View File

@@ -15,7 +15,7 @@ import org.redkale.net.http.*;
*
* @author zhangjx
*/
@RestWebSocket(name = "chat", catalog = "ws", comment = "文字聊天")
@RestWebSocket(name = "chat", catalog = "ws", comment = "文字聊天", anyuser = true)
public class ChatWebSocket extends WebSocket<Integer, Object> {
protected static final AtomicInteger idcreator = new AtomicInteger(10000);