Redkale 2.6.0 结束

This commit is contained in:
Redkale
2021-12-01 09:52:52 +08:00
parent f0ac042b3c
commit 6e21fe56e9
50 changed files with 1457 additions and 413 deletions

View File

@@ -7,7 +7,7 @@
<name>RedkaleProject</name>
<url>https://redkale.org</url>
<description>redkale -- java framework</description>
<version>2.5.0</version>
<version>2.6.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

View File

@@ -11,6 +11,7 @@ import org.redkale.net.TransportGroupInfo;
import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.net.http.HttpClient;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
@@ -509,7 +510,8 @@ public final class Application {
}
ExecutorService workExecutor0 = null;
if (executorConf != null) {
{
if (executorConf == null) executorConf = DefaultAnyValue.create();
final AtomicReference<ExecutorService> workref = new AtomicReference<>();
final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus()));
boolean executorHash = executorConf.getBoolValue("hash");
@@ -804,18 +806,36 @@ public final class Application {
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
//------------------------------------- 注册 HttpClient --------------------------------------------------------
//------------------------------------- 注册 java.net.http.HttpClient --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
HttpClient httpClient = HttpClient.create(asyncGroup);
java.net.http.HttpClient.Builder builder = java.net.http.HttpClient.newBuilder();
if (resourceName.endsWith(".1.1")) {
builder.version(HttpClient.Version.HTTP_1_1);
} else if (resourceName.endsWith(".2")) {
builder.version(HttpClient.Version.HTTP_2);
}
java.net.http.HttpClient httpClient = builder.build();
field.set(src, httpClient);
rf.inject(httpClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpClient.class, httpClient);
rf.register(resourceName, java.net.http.HttpClient.class, httpClient);
} catch (Exception e) {
logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] java.net.http.HttpClient inject error", e);
}
}, java.net.http.HttpClient.class);
//------------------------------------- 注册 HttpSimpleClient --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
HttpSimpleClient httpClient = HttpSimpleClient.create(asyncGroup);
field.set(src, httpClient);
rf.inject(httpClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpSimpleClient.class, httpClient);
} catch (Exception e) {
logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] HttpClient inject error", e);
}
}, HttpClient.class);
}, HttpSimpleClient.class);
//--------------------------------------------------------------------------
if (this.asyncGroup != null) {
((AsyncIOGroup) this.asyncGroup).start();
@@ -857,7 +877,7 @@ public final class Application {
rf.register(resourceName, HttpMessageClient.class, messageClient);
return;
}
HttpMessageClient messageClient = new HttpMessageClusterClient(clusterAgent);
HttpMessageClient messageClient = new HttpMessageClusterClient(application, resourceName, clusterAgent);
field.set(src, messageClient);
rf.inject(messageClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpMessageClient.class, messageClient);
@@ -1066,6 +1086,11 @@ public final class Application {
}
}
/**
* 启动
*
* @throws Exception 异常
*/
public void start() throws Exception {
if (!singletonMode && !compileMode && this.clusterAgent != null) {
this.clusterAgent.register(this);
@@ -1248,10 +1273,31 @@ public final class Application {
sercdl.await();
}
/**
* 实例化单个Service
*
* @param <T> 泛型
* @param serviceClass 指定的service类
* @param extServiceClasses 需要排除的service类
*
* @return Service对象
* @throws Exception 异常
*/
public static <T extends Service> T singleton(Class<T> serviceClass, Class<? extends Service>... extServiceClasses) throws Exception {
return singleton("", serviceClass, extServiceClasses);
}
/**
* 实例化单个Service
*
* @param <T> 泛型
* @param name Service的资源名
* @param serviceClass 指定的service类
* @param extServiceClasses 需要排除的service类
*
* @return Service对象
* @throws Exception 异常
*/
public static <T extends Service> T singleton(String name, Class<T> serviceClass, Class<? extends Service>... extServiceClasses) throws Exception {
if (serviceClass == null) throw new IllegalArgumentException("serviceClass is null");
final Application application = Application.create(true);
@@ -1279,6 +1325,11 @@ public final class Application {
return new Application(singleton, false, loadAppConfig());
}
/**
* 重新加载配置信息
*
* @throws IOException 异常
*/
public void reloadConfig() throws IOException {
AnyValue newconfig = loadAppConfig();
final String confpath = this.confPath.toString();

View File

@@ -131,7 +131,7 @@ public class NodeHttpServer extends NodeServer {
} catch (Exception ex) {
logger.log(Level.WARNING, "WebSocketServlet getMessageAgent error", ex);
}
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, org.redkale.net.http.WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
}
resourceFactory.inject(nodeService, self);

View File

@@ -438,7 +438,7 @@ public abstract class NodeServer {
rf.inject(source, self); //
if (!application.isCompileMode() && source instanceof Service) ((Service) source).init(sourceConf);
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
if ((src instanceof org.redkale.net.http.WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
if (source != null && source.getClass().getAnnotation(Local.class) == null) { //本地模式的Service不生成SncpServlet
sncpServer.getSncpServer().addSncpServlet((Service) source);
@@ -468,9 +468,9 @@ public abstract class NodeServer {
if (nodeService == null) {
final HashSet<String> groups = new HashSet<>();
if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, org.redkale.net.http.WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
(isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService);
((WebSocketNodeService) nodeService).setName(resourceName);
((org.redkale.net.http.WebSocketNodeService) nodeService).setName(resourceName);
}
resourceFactory.inject(nodeService, self);
MessageAgent messageAgent = Sncp.getMessageAgent((Service) src);
@@ -551,8 +551,8 @@ public abstract class NodeServer {
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if (service instanceof WebSocketNodeService) {
((WebSocketNodeService) service).setName(resourceName);
if (service instanceof org.redkale.net.http.WebSocketNodeService) {
((org.redkale.net.http.WebSocketNodeService) service).setName(resourceName);
if (agent != null) Sncp.setMessageAgent(service, agent);
}
final Class restype = Sncp.getResourceType(service);

View File

@@ -15,9 +15,15 @@ import org.redkale.watch.WatchService;
*/
public abstract class AbstractWatchService extends AbstractService implements WatchService {
/**
* 缺少参数
*/
@Comment("缺少参数")
public static final int RET_WATCH_PARAMS_ILLEGAL = 1600_0001;
/**
* 执行异常
*/
@Comment("执行异常")
public static final int RET_WATCH_RUN_EXCEPTION = 1600_0002;
}

View File

@@ -112,7 +112,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
updateSncpTransport(entry);
});
} catch (Exception e) {
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e);
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e);
}
}, Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS);
}

View File

@@ -14,7 +14,7 @@ import static org.redkale.convert.Reader.ValueType.MAP;
/**
* 对不明类型的对象进行反序列化。 <br>
* <b>注意: 目前只支持文本格式</b> <br>
* <p>
*
* 详情见: https://redkale.org
*
* @author zhangjx
@@ -31,12 +31,17 @@ public class AnyDecoder implements Decodeable<Reader, Object> {
private static final Creator<HashMap> mapCreator = Creator.create(HashMap.class);
protected final Decodeable<Reader, String> stringDecoder;
final Decodeable<Reader, String> stringDecoder;
protected final CollectionDecoder collectionDecoder;
final CollectionDecoder collectionDecoder;
protected final MapDecoder mapDecoder;
final MapDecoder mapDecoder;
/**
* 构造函数
*
* @param factory ConvertFactory
*/
public AnyDecoder(final ConvertFactory factory) {
this.stringDecoder = factory.loadDecoder(String.class);
this.collectionDecoder = new CollectionDecoder(factory, collectionObjectType, Object.class, collectionCreator, this);

View File

@@ -6,11 +6,10 @@
package org.redkale.convert;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
/**
* 对不明类型的对象进行序列化; BSON序列化时将对象的类名写入WriterJSON则不写入。
* <p>
*
* 详情见: https://redkale.org
*
* @author zhangjx
@@ -42,29 +41,6 @@ public final class AnyEncoder<T> implements Encodeable<Writer, T> {
}
}
@SuppressWarnings("unchecked")
public void convertMapTo(final Writer out, final Object... values) {
if (values == null) {
out.writeNull();
} else {
int count = values.length - values.length % 2;
if (out.writeMapB(count / 2, (Encodeable) this, (Encodeable) this, values) < 0) {
for (int i = 0; i < count; i += 2) {
if (i > 0) out.writeArrayMark();
this.convertTo(out, (T) values[i]);
out.writeMapMark();
Object val = values[i + 1];
if (val instanceof CompletableFuture) {
this.convertTo(out, (T) ((CompletableFuture) val).join());
} else {
this.convertTo(out, (T) val);
}
}
}
out.writeMapE();
}
}
@Override
public Type getType() {
return Object.class;

View File

@@ -6,7 +6,9 @@
package org.redkale.convert;
import java.lang.reflect.*;
import org.redkale.util.Attribute;
import javax.persistence.Column;
import org.redkale.source.FilterColumn;
import org.redkale.util.*;
/**
* 字段的反序列化操作类
@@ -26,6 +28,8 @@ public final class DeMember<R extends Reader, T, F> {
final Method method; //对应类成员的Method也可能为null
final String comment;
protected int index;
protected int position; //从1开始
@@ -40,6 +44,43 @@ public final class DeMember<R extends Reader, T, F> {
this.attribute = attribute;
this.field = field;
this.method = method;
if (field != null) {
Comment ct = field.getAnnotation(Comment.class);
if (ct == null) {
Column col = field.getAnnotation(Column.class);
if (col == null) {
FilterColumn fc = field.getAnnotation(FilterColumn.class);
if (fc == null) {
this.comment = "";
} else {
this.comment = fc.comment();
}
} else {
this.comment = col.comment();
}
} else {
this.comment = ct.value();
}
} else if (method != null) {
Comment ct = method.getAnnotation(Comment.class);
if (ct == null) {
Column col = method.getAnnotation(Column.class);
if (col == null) {
FilterColumn fc = method.getAnnotation(FilterColumn.class);
if (fc == null) {
this.comment = "";
} else {
this.comment = fc.comment();
}
} else {
this.comment = col.comment();
}
} else {
this.comment = ct.value();
}
} else {
this.comment = "";
}
}
public DeMember(Attribute<T, F> attribute, Decodeable<R, F> decoder, Field field, Method method) {
@@ -85,6 +126,10 @@ public final class DeMember<R extends Reader, T, F> {
return this.attribute;
}
public String getComment() {
return comment;
}
public Decodeable<R, F> getDecoder() {
return decoder;
}

View File

@@ -19,6 +19,13 @@ import java.lang.reflect.Type;
*/
public interface Decodeable<R extends Reader, T> {
/**
* 反序列化操作
*
* @param in R
*
* @return T
*/
public T convertFrom(final R in);
/**

View File

@@ -6,7 +6,9 @@
package org.redkale.convert;
import java.lang.reflect.*;
import org.redkale.util.Attribute;
import javax.persistence.Column;
import org.redkale.source.FilterColumn;
import org.redkale.util.*;
/**
* 字段的序列化操作类
@@ -26,6 +28,8 @@ public final class EnMember<W extends Writer, T, F> {
final Encodeable<W, F> encoder;
final String comment;
final boolean string;
//final boolean isnumber;
@@ -39,11 +43,11 @@ public final class EnMember<W extends Writer, T, F> {
final Method method; //对应类成员的Method也可能为null
protected int index;
int index;
protected int position; //从1开始
int position; //从1开始
protected int tag; //主要给protobuf使用
int tag; //主要给protobuf使用
public EnMember(Attribute<T, F> attribute, Encodeable<W, F> encoder, Field field, Method method) {
this.attribute = attribute;
@@ -55,6 +59,43 @@ public final class EnMember<W extends Writer, T, F> {
this.bool = t == Boolean.class || t == boolean.class;
this.jsonFieldNameChars = ('"' + attribute.field() + "\":").toCharArray();
this.jsonFieldNameBytes = ('"' + attribute.field() + "\":").getBytes();
if (field != null) {
Comment ct = field.getAnnotation(Comment.class);
if (ct == null) {
Column col = field.getAnnotation(Column.class);
if (col == null) {
FilterColumn fc = field.getAnnotation(FilterColumn.class);
if (fc == null) {
this.comment = "";
} else {
this.comment = fc.comment();
}
} else {
this.comment = col.comment();
}
} else {
this.comment = ct.value();
}
} else if (method != null) {
Comment ct = method.getAnnotation(Comment.class);
if (ct == null) {
Column col = method.getAnnotation(Column.class);
if (col == null) {
FilterColumn fc = method.getAnnotation(FilterColumn.class);
if (fc == null) {
this.comment = "";
} else {
this.comment = fc.comment();
}
} else {
this.comment = col.comment();
}
} else {
this.comment = ct.value();
}
} else {
this.comment = "";
}
//this.isnumber = Number.class.isAssignableFrom(t) || (!this.isbool && t.isPrimitive());
}
@@ -88,6 +129,10 @@ public final class EnMember<W extends Writer, T, F> {
return attribute;
}
public String getComment() {
return comment;
}
public char[] getJsonFieldNameChars() {
return jsonFieldNameChars;
}

View File

@@ -19,6 +19,12 @@ import java.lang.reflect.Type;
*/
public interface Encodeable<W extends Writer, T> {
/**
* 序列化操作
*
* @param out Writer
* @param value 对象
*/
public void convertTo(final W out, T value);
/**

View File

@@ -44,7 +44,7 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
return false;
}
private static boolean checkMemberType(final JsonFactory factory, Type type, Class clazz) {
private static boolean checkMemberType(final JsonFactory factory, final Class declaringClass, Type type, Class clazz) {
if (type == String.class) return true;
if (clazz.isPrimitive()) return true;
if (clazz.isEnum()) return true;
@@ -65,6 +65,8 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
if (type == Long[].class) return true;
if (type == Double[].class) return true;
if (type == String[].class) return true;
if (declaringClass == clazz) return false;
if (Collection.class.isAssignableFrom(clazz) && type instanceof ParameterizedType) {
Type[] ts = ((ParameterizedType) type).getActualTypeArguments();
if (ts.length == 1) {
@@ -72,15 +74,9 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
if (t == Boolean.class || t == Byte.class || t == Short.class || t == Character.class
|| t == Integer.class || t == Float.class || t == Long.class || t == Double.class
|| t == String.class || ((t instanceof Class) && ((Class) t).isEnum())) return true;
if (factory.loadEncoder(t) instanceof JsonDynEncoder) return true;
return false;
}
}
if (type instanceof TypeVariable) return false;
try {
if (factory.loadEncoder(type) instanceof JsonDynEncoder) return true;
} catch (Exception e) {
return false;
}
return false;
}
@@ -119,7 +115,7 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
if (factory.isConvertDisabled(field)) continue;
ref = factory.findRef(clazz, field);
if (ref != null && ref.ignore()) continue;
if (!(checkMemberType(factory, field.getGenericType(), field.getType()))) return null;
if (!(checkMemberType(factory, clazz, field.getGenericType(), field.getType()))) return null;
String name = convertFieldName(factory, clazz, field);
if (names.contains(name)) continue;
names.add(name);
@@ -139,7 +135,7 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
if (method.getReturnType() == void.class) continue;
ref = factory.findRef(clazz, method);
if (ref != null && ref.ignore()) continue;
if (!(checkMemberType(factory, method.getGenericReturnType(), method.getReturnType()))) return null;
if (!(checkMemberType(factory, clazz, method.getGenericReturnType(), method.getReturnType()))) return null;
String name = convertFieldName(factory, clazz, method);
if (names.contains(name)) continue;
names.add(name);
@@ -404,8 +400,8 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
} else {
mv.visitMethodInsn(INVOKEVIRTUAL, valtypeName, ((Method) element1).getName(), "()" + org.redkale.asm.Type.getDescriptor(fieldtype1), false);
}
maxLocals ++;
maxLocals++;
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, fieldname2 + "CommaFieldBytes", "[B");
@@ -415,10 +411,10 @@ public abstract class JsonDynEncoder<T> implements Encodeable<JsonWriter, T> {
} else {
mv.visitMethodInsn(INVOKEVIRTUAL, valtypeName, ((Method) element2).getName(), "()" + org.redkale.asm.Type.getDescriptor(fieldtype2), false);
}
maxLocals ++;
maxLocals++;
mv.visitMethodInsn(INVOKEVIRTUAL, writerName, "writeObjectByOnlyTwoIntFieldValue", "([BI[BI)V", false);
} else if (onlyShotIntLongLatin1MoreFieldObjectFlag && mustHadComma) {
for (AccessibleObject element : members) {
elementIndex++;

View File

@@ -8,11 +8,13 @@ package org.redkale.mq;
import java.io.Serializable;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.Level;
import javax.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.cluster.ClusterAgent;
import org.redkale.net.http.*;
import org.redkale.util.Utility;
@@ -33,27 +35,44 @@ public class HttpMessageClusterClient extends HttpMessageClient {
private static final Set<String> DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length",
"date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning");
protected final HttpMessageLocalClient localClient;
protected final ConcurrentHashMap<String, Boolean> topicServletMap = new ConcurrentHashMap<>();
protected ClusterAgent clusterAgent;
@Resource(name = "cluster.httpClient")
protected HttpClient httpClient;
protected java.net.http.HttpClient httpClient;
//protected java.net.http.HttpClient httpClient;
public HttpMessageClusterClient(ClusterAgent clusterAgent) {
@Resource(name = "cluster.httpClient")
protected HttpSimpleClient httpSimpleClient;
public HttpMessageClusterClient(Application application, String resourceName, ClusterAgent clusterAgent) {
super(null);
Objects.requireNonNull(clusterAgent);
this.localClient = new HttpMessageLocalClient(application, resourceName);
this.clusterAgent = clusterAgent;
//this.httpClient = java.net.http.HttpClient.newHttpClient();
this.finest = logger.isLoggable(Level.FINEST);
this.finer = logger.isLoggable(Level.FINER);
this.fine = logger.isLoggable(Level.FINE);
}
@Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return httpAsync(userid, request);
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
return localClient.sendMessage(topic, userid, groupid, request, counter);
} else {
return httpAsync(false, userid, request);
}
}
@Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
httpAsync(userid, request);
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
localClient.produceMessage(topic, userid, groupid, request, counter);
} else {
httpAsync(true, userid, request);
}
}
@Override
@@ -62,14 +81,17 @@ public class HttpMessageClusterClient extends HttpMessageClient {
}
private CompletableFuture<HttpResult<byte[]>> mqtpAsync(Serializable userid, HttpSimpleRequest req) {
final boolean finest = logger.isLoggable(Level.FINEST);
String module = req.getRequestURI();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = req.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
final String localModule = module;
return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> {
if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture();
if (addrmap == null || addrmap.isEmpty()) {
if (fine) logger.log(Level.FINE, "mqtpAsync.broadcastMessage: module=" + localModule + ", resname=" + resname + ", addrmap is empty");
return new HttpResult<byte[]>().status(404).toFuture();
}
final Map<String, String> clientHeaders = new LinkedHashMap<>();
byte[] clientBody = null;
if (req.isRpc()) clientHeaders.put(Rest.REST_HEADER_RPC_NAME, "true");
@@ -96,6 +118,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (paramstr != null) clientBody = paramstr.getBytes(StandardCharsets.UTF_8);
}
List<CompletableFuture> futures = new ArrayList<>();
if (finest) logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap);
for (Map.Entry<String, Collection<InetSocketAddress>> en : addrmap.entrySet()) {
String realmodule = en.getKey();
Collection<InetSocketAddress> addrs = en.getValue();
@@ -110,15 +133,19 @@ public class HttpMessageClusterClient extends HttpMessageClient {
});
}
private CompletableFuture<HttpResult<byte[]>> httpAsync(Serializable userid, HttpSimpleRequest req) {
final boolean finest = logger.isLoggable(Level.FINEST);
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
String module = req.getRequestURI();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = req.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
final String localModule = module;
if (finest) logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname);
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
if (addrs == null || addrs.isEmpty()) {
if (fine) logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty");
return new HttpResult<byte[]>().status(404).toFuture();
}
final Map<String, String> clientHeaders = new LinkedHashMap<>();
byte[] clientBody = null;
if (req.isRpc()) clientHeaders.put(Rest.REST_HEADER_RPC_NAME, "true");
@@ -126,9 +153,14 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (req.getReqConvertType() != null) clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
if (req.getRespConvertType() != null) clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
if (userid != null) clientHeaders.put(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid);
if (headers != null) headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) clientHeaders.put(n, v);
if (headers != null) {
boolean ws = headers.containsKey("Sec-WebSocket-Key");
headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())
&& (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n)
&& !"Sec-WebSocket-Version".equals(n)))) clientHeaders.put(n, v);
});
}
clientHeaders.put("Content-Type", "x-www-form-urlencoded");
if (req.getBody() != null && req.getBody().length > 0) {
String paramstr = req.getParametersToString();
@@ -144,6 +176,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
String paramstr = req.getParametersToString();
if (paramstr != null) clientBody = paramstr.getBytes(StandardCharsets.UTF_8);
}
if (finest) logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture");
return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), clientHeaders, clientBody, addrs.iterator());
});
}
@@ -152,7 +185,19 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (!it.hasNext()) return CompletableFuture.completedFuture(null);
InetSocketAddress addr = it.next();
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
return httpClient.postAsync(url, clientHeaders, clientBody);
if (finest) logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders);
if (httpSimpleClient != null) return httpSimpleClient.postAsync(url, clientHeaders, clientBody);
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url))
.timeout(Duration.ofMillis(10_000))
//存在sendHeader后不发送body数据的问题 java.net.http.HttpRequest的bug?
.method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody));
if (clientHeaders != null) clientHeaders.forEach((n, v) -> builder.header(n, v));
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
final int rs = resp.statusCode();
if (rs != 200) return new HttpResult<byte[]>().status(rs);
return new HttpResult<byte[]>(resp.body());
});
}
// private CompletableFuture<HttpResult<byte[]>> mqtpAsync(Serializable userid, HttpSimpleRequest req) {

View File

@@ -72,53 +72,31 @@ public class HttpMessageLocalClient extends HttpMessageClient {
return (HttpPrepareServlet) httpServer().getPrepareServlet();
}
protected HttpServlet findHttpServlet(String topic) {
return prepareServlet().findServletByTopic(topic);
}
protected HttpServlet findHttpServlet(HttpSimpleRequest request) {
return prepareServlet().findServletByTopic(generateHttpReqTopic(request, request.getPath()));
}
@Override
public <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
HttpPrepareServlet ps = prepareServlet();
String topic = generateHttpReqTopic(request, request.getPath());
HttpServlet servlet = ps.findServletByTopic(topic);
CompletableFuture future = new CompletableFuture();
if (servlet == null) {
future.completeExceptionally(new RuntimeException("404 Not Found " + topic));
return future;
}
HttpRequest req = new HttpMessageLocalRequest(context(), request);
HttpResponse resp = new HttpMessageLocalResponse(req, future);
try {
servlet.execute(req, resp);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
return sendMessage((Serializable) null, (String) null, request, type);
}
@Override
public <T> CompletableFuture<T> sendMessage(Serializable userid, HttpSimpleRequest request, Type type) {
HttpPrepareServlet ps = prepareServlet();
String topic = generateHttpReqTopic(request, request.getPath());
HttpServlet servlet = ps.findServletByTopic(topic);
CompletableFuture future = new CompletableFuture();
if (servlet == null) {
future.completeExceptionally(new RuntimeException("404 Not Found " + topic));
return future;
}
HttpRequest req = new HttpMessageLocalRequest(context(), request);
HttpResponse resp = new HttpMessageLocalResponse(req, future);
try {
servlet.execute(req, resp);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
return sendMessage(userid, (String) null, request, type);
}
@Override
public <T> CompletableFuture<T> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) {
HttpPrepareServlet ps = prepareServlet();
String topic = generateHttpReqTopic(request, request.getPath());
HttpServlet servlet = ps.findServletByTopic(topic);
HttpServlet servlet = findHttpServlet(topic);
CompletableFuture future = new CompletableFuture();
if (servlet == null) {
if (fine) logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet");
future.completeExceptionally(new RuntimeException("404 Not Found " + topic));
return future;
}
@@ -134,9 +112,11 @@ public class HttpMessageLocalClient extends HttpMessageClient {
@Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
HttpPrepareServlet ps = prepareServlet();
HttpServlet servlet = ps.findServletByTopic(topic);
if (servlet == null) return CompletableFuture.completedFuture(new HttpResult().status(404));
HttpServlet servlet = findHttpServlet(topic);
if (servlet == null) {
if (fine) logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet");
return CompletableFuture.completedFuture(new HttpResult().status(404));
}
HttpRequest req = new HttpMessageLocalRequest(context(), request);
CompletableFuture future = new CompletableFuture();
HttpResponse resp = new HttpMessageLocalResponse(req, future);
@@ -160,7 +140,10 @@ public class HttpMessageLocalClient extends HttpMessageClient {
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
HttpPrepareServlet ps = prepareServlet();
HttpServlet servlet = ps.findServletByTopic(topic);
if (servlet == null) return;
if (servlet == null) {
if (fine) logger.log(Level.FINE, "produceMessage: request=" + request + ", not found servlet");
return;
}
HttpRequest req = new HttpMessageLocalRequest(context(), request);
HttpResponse resp = new HttpMessageLocalResponse(req, null);
try {
@@ -200,17 +183,70 @@ public class HttpMessageLocalClient extends HttpMessageClient {
this.future = future;
}
@Override
public void finishJson(final Object obj) {
finish((Convert) null, (Type) null, obj);
}
@Override
public void finishJson(final JsonConvert convert, final Object obj) {
finish(convert, (Type) null, obj);
}
@Override
public void finishJson(final Type type, final Object obj) {
finish((Convert) null, type, obj);
}
@Override
public void finishJson(final JsonConvert convert, final Type type, final Object obj) {
if (future == null) return;
future.complete(obj);
}
@Override
public void finish(Type type, org.redkale.service.RetResult ret) {
finish((Convert) null, type, ret);
}
@Override
public void finish(final Convert convert, Type type, org.redkale.service.RetResult ret) {
if (future == null) return;
future.complete(ret);
}
@Override
public void finish(final Convert convert, final Type type, Object obj) {
if (future == null) return;
future.complete(obj);
}
@Override
public void finish(String obj) {
if (future == null) return;
future.complete(obj == null ? "" : obj);
}
@Override
public void finish304() {
finish(304, null);
}
@Override
public void finish404() {
finish(404, null);
}
@Override
public void finish500() {
finish(500, null);
}
@Override
public void finish504() {
finish(504, null);
}
@Override
public void finish(int status, String msg) {
if (future == null) return;

View File

@@ -86,7 +86,7 @@ public class HttpMessageProcessor implements MessageProcessor {
@Override
public void begin(final int size, long starttime) {
this.starttime = starttime;
this.cdl = new CountDownLatch(size);
this.cdl = size > 1 ? new CountDownLatch(size) : null;
}
@Override

View File

@@ -12,6 +12,7 @@ import java.util.Arrays;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*;
import org.redkale.service.RetResult;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT;
@@ -71,6 +72,10 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), type, this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result);
}
public void finishHttpResult(Type type, Convert respConvert, HttpResult result) {
finishHttpResult(this.finest, respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result);
}
public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) {
if (callback != null) callback.run();
if (resptopic == null || resptopic.isEmpty()) return;
@@ -108,6 +113,67 @@ public class HttpMessageResponse extends HttpResponse {
return rs;
}
@Override
public void finishJson(final Object obj) {
finishJson((JsonConvert) null, (Type) null, obj);
}
@Override
public void finishJson(final JsonConvert convert, final Object obj) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(obj.getClass(), convert, new HttpResult(obj));
}
@Override
public void finishJson(final Type type, final Object obj) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(type, new HttpResult(obj));
}
@Override
public void finishJson(final JsonConvert convert, final Type type, final Object obj) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(type, convert, new HttpResult(obj));
}
@Override
public void finish(Type type, org.redkale.service.RetResult ret) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(type, new HttpResult(ret));
}
@Override
public void finish(final Convert convert, Type type, org.redkale.service.RetResult ret) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(type, convert, new HttpResult(ret));
}
@Override
public void finish(final Convert convert, final Type type, Object obj) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
finishHttpResult(type, convert, new HttpResult(obj));
}
@Override
public void finish(String obj) {
if (message.isEmptyResptopic()) {
@@ -117,11 +183,26 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(String.class, new HttpResult(obj == null ? "" : obj));
}
@Override
public void finish304() {
finish(304, null);
}
@Override
public void finish404() {
finish(404, null);
}
@Override
public void finish500() {
finish(500, null);
}
@Override
public void finish504() {
finish(504, null);
}
@Override
public void finish(int status, String msg) {
if (status > 400) {
@@ -169,6 +250,16 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(null, new HttpResult(rs).contentType(contentType));
}
@Override
protected <A> void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length, Consumer<A> consumer, A attachment) {
if (message.isEmptyResptopic()) {
if (callback != null) callback.run();
return;
}
byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length);
finishHttpResult(null, new HttpResult(rs).contentType(contentType));
}
@Override
public void finish(boolean kill, ByteBuffer buffer) {
if (message.isEmptyResptopic()) {

View File

@@ -8,7 +8,7 @@ package org.redkale.mq;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.logging.*;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.redkale.boot.*;
@@ -165,12 +165,15 @@ public abstract class MessageAgent {
if (this.sncpProducer == null) {
synchronized (sncpProducerLock) {
if (this.sncpProducer == null) {
long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount];
for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("SncpProducer");
producer.startup().join();
producers[i] = producer;
}
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms");
this.sncpProducer = new MessageProducers(producers);
}
}
@@ -182,12 +185,15 @@ public abstract class MessageAgent {
if (this.httpProducer == null) {
synchronized (httpProducerLock) {
if (this.httpProducer == null) {
long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount];
for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("HttpProducer");
producer.startup().join();
producers[i] = producer;
}
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms");
this.httpProducer = new MessageProducers(producers);
}
}

View File

@@ -76,18 +76,23 @@ public abstract class MessageClient {
if (node.scheduledFuture != null) node.scheduledFuture.cancel(true);
AtomicLong ncer = node.getCounter();
if (ncer != null) ncer.decrementAndGet();
final long cha = now - msg.createtime;
if (finest) messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")");
node.future.complete(msg);
long cha = now - msg.createtime;
if (cha > 1000 && fine) {
messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.process (mqs.delays = " + cha + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
} else if (cha > 50 && finer) {
messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.process (mq.delays = " + cha + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
long cha2 = System.currentTimeMillis() - now;
if ((cha > 1000 || cha2 > 1000) && fine) {
messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
} else if ((cha > 50 || cha2 > 50) && finer) {
messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
} else if (finest) {
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.process (mq.delay = " + cha + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
}
};
long ones = System.currentTimeMillis();
MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
one.startup().join();
long onee = System.currentTimeMillis() - ones;
if (finest) messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.startup " + onee + "ms ");
this.respConsumer = one;
}
}

View File

@@ -64,7 +64,7 @@ public class SncpMessageProcessor implements MessageProcessor {
@Override
public void begin(final int size, long starttime) {
this.starttime = starttime;
this.cdl = new CountDownLatch(size);
this.cdl = size > 1 ? new CountDownLatch(size) : null;
}
@Override

View File

@@ -28,4 +28,13 @@ public abstract class AsyncThread extends WorkThread {
return t instanceof AsyncThread ? (AsyncThread) t : null;
}
/**
* 是否IO线程
*
* @return boolean
*/
@Override
public final boolean inIO() {
return true;
}
}

View File

@@ -133,7 +133,7 @@ public class Context {
}
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.finish(true);
response.error();
}
});
} else if (workExecutor != null) {
@@ -142,7 +142,7 @@ public class Context {
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.finish(true);
response.error();
}
});
} else {
@@ -150,7 +150,7 @@ public class Context {
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.finish(true);
response.error();
}
}

View File

@@ -221,7 +221,7 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
response.nextEvent();
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.finish(true);
response.error();
}
}

View File

@@ -190,6 +190,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.finish(false);
}
protected void error() {
finish(true);
}
public void finish(boolean kill) {
if (!this.inited) return; //避免重复关闭
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime));

View File

@@ -86,6 +86,16 @@ public class WorkThread extends Thread implements Executor {
return workExecutor;
}
/**
* 是否IO线程
*
* @since 2.6.0
* @return boolean
*/
public boolean inIO() {
return false;
}
public boolean inCurrThread() {
return this == Thread.currentThread();
}

View File

@@ -23,7 +23,7 @@ import org.redkale.util.ByteArray;
*/
public abstract class ClientCodec<R extends ClientRequest, P> {
private final List<ClientResult<P>> results = new ArrayList<>();
protected final List<ClientResult<P>> results = new ArrayList<>();
public ClientCodec() {
}

View File

@@ -153,17 +153,15 @@ public class ClientConnection<R extends ClientRequest, P> implements Consumer<As
request.workThread = null;
}
if (rs.exc != null) {
if (workThread == null || workThread == Thread.currentThread()
|| workThread.getState() == Thread.State.BLOCKED
|| workThread.getState() == Thread.State.WAITING) {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
respFuture.completeExceptionally(rs.exc);
} else {
workThread.execute(() -> respFuture.completeExceptionally(rs.exc));
}
} else {
if (workThread == null || workThread == Thread.currentThread()
|| workThread.getState() == Thread.State.BLOCKED
|| workThread.getState() == Thread.State.WAITING) {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
respFuture.complete(rs.result);
} else {
workThread.execute(() -> respFuture.complete(rs.result));

View File

@@ -61,9 +61,8 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
workThread = request.workThread;
request.workThread = null;
}
if (workThread == null || workThread == Thread.currentThread()
|| workThread.getState() == Thread.State.BLOCKED
|| workThread.getState() == Thread.State.WAITING) {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
this.completeExceptionally(ex);
} else {
workThread.execute(() -> completeExceptionally(ex));

View File

@@ -430,27 +430,27 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param type 指定的RetResult泛型类型
* @param ret RetResult输出对象
*/
@Deprecated //@since 2.5.0
public void finishJson(Type type, org.redkale.service.RetResult ret) {
this.contentType = this.jsonContentType;
if (this.retResultHandler != null) {
ret = this.retResultHandler.apply(this.request, ret);
}
if (this.recycleListener != null) this.output = ret;
if (ret != null && !ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
this.header.addValue("retinfo", ret.getRetinfo());
}
Convert convert = ret == null ? null : ret.convert();
if (convert == null) convert = request.getRespConvert();
if (convert == jsonRootConvert) {
JsonBytesWriter writer = jsonWriter;
convert.convertTo(writer.clear(), type, ret);
finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null);
} else {
convert.convertToBytes(type, ret, convertHandler);
}
}
// @Deprecated //@since 2.5.0
// public void finishJson(Type type, org.redkale.service.RetResult ret) {
// this.contentType = this.jsonContentType;
// if (this.retResultHandler != null) {
// ret = this.retResultHandler.apply(this.request, ret);
// }
// if (this.recycleListener != null) this.output = ret;
// if (ret != null && !ret.isSuccess()) {
// this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
// this.header.addValue("retinfo", ret.getRetinfo());
// }
// Convert convert = ret == null ? null : ret.convert();
// if (convert == null) convert = request.getRespConvert();
// if (convert == jsonRootConvert) {
// JsonBytesWriter writer = jsonWriter;
// convert.convertTo(writer.clear(), type, ret);
// finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null);
// } else {
// convert.convertToBytes(type, ret, convertHandler);
// }
// }
/**
* 将RetResult对象以JSON格式输出
@@ -459,25 +459,25 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param type 指定的RetResult泛型类型
* @param ret RetResult输出对象
*/
@Deprecated //@since 2.5.0
public void finishJson(final JsonConvert convert, Type type, org.redkale.service.RetResult ret) {
this.contentType = this.jsonContentType;
if (this.retResultHandler != null) {
ret = this.retResultHandler.apply(this.request, ret);
}
if (this.recycleListener != null) this.output = ret;
if (ret != null && !ret.isSuccess()) {
this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
this.header.addValue("retinfo", ret.getRetinfo());
}
if (convert == jsonRootConvert) {
JsonBytesWriter writer = jsonWriter;
convert.convertTo(writer.clear(), type, ret);
finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null);
} else {
convert.convertToBytes(type, ret, convertHandler);
}
}
// @Deprecated //@since 2.5.0
// public void finishJson(final JsonConvert convert, Type type, org.redkale.service.RetResult ret) {
// this.contentType = this.jsonContentType;
// if (this.retResultHandler != null) {
// ret = this.retResultHandler.apply(this.request, ret);
// }
// if (this.recycleListener != null) this.output = ret;
// if (ret != null && !ret.isSuccess()) {
// this.header.addValue("retcode", String.valueOf(ret.getRetcode()));
// this.header.addValue("retinfo", ret.getRetinfo());
// }
// if (convert == jsonRootConvert) {
// JsonBytesWriter writer = jsonWriter;
// convert.convertTo(writer.clear(), type, ret);
// finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null);
// } else {
// convert.convertToBytes(type, ret, convertHandler);
// }
// }
/**
* 将CompletableFuture的结果对象以JSON格式输出
@@ -486,11 +486,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param valueType 指定CompletableFuture.value的泛型类型
* @param future 输出对象的句柄
*/
@Deprecated //@since 2.5.0
@SuppressWarnings("unchecked")
public void finishJson(final JsonConvert convert, final Type valueType, final CompletableFuture future) {
finish(convert, valueType, future);
}
// @Deprecated //@since 2.5.0
// @SuppressWarnings("unchecked")
// public void finishJson(final JsonConvert convert, final Type valueType, final CompletableFuture future) {
// finish(convert, valueType, future);
// }
/**
* 将RetResult对象输出
@@ -932,7 +932,12 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
// }
// }
}
@Override
protected void error() {
finish500();
}
/**
* 以304状态码输出
*/

View File

@@ -20,7 +20,7 @@ import org.redkale.util.*;
* 1使用HTTPS<br>
* 2上传下载文件<br>
* 3返回超大响应包<br>
* 类似JDK11的 java.net.http.HttpClient <br>
* 类似JDK11的 java.net.http.HttpSimpleClient <br>
*
* <p>
* 详情见: https://redkale.org
@@ -29,7 +29,7 @@ import org.redkale.util.*;
* @since 2.3.0
*
*/
public class HttpClient {
public class HttpSimpleClient {
protected final AsyncGroup asyncGroup;
@@ -37,12 +37,12 @@ public class HttpClient {
protected int writeTimeoutSeconds = 6;
protected HttpClient(AsyncGroup asyncGroup) {
protected HttpSimpleClient(AsyncGroup asyncGroup) {
this.asyncGroup = asyncGroup;
}
public static HttpClient create(AsyncGroup asyncGroup) {
return new HttpClient(asyncGroup);
public static HttpSimpleClient create(AsyncGroup asyncGroup) {
return new HttpSimpleClient(asyncGroup);
}
public CompletableFuture<HttpResult<byte[]>> getAsync(String url) {
@@ -103,7 +103,7 @@ public class HttpClient {
+ "Host: " + uri.getHost() + "\r\n"
+ "Content-Length: " + (body == null ? 0 : body.length) + "\r\n").getBytes(StandardCharsets.UTF_8));
if (headers == null || !headers.containsKey("User-Agent")) {
array.put(("User-Agent: redkale-httpclient/" + Redkale.getDotedVersion() + "\r\n").getBytes(StandardCharsets.UTF_8));
array.put(("User-Agent: Redkale-http-client/" + Redkale.getDotedVersion() + "\r\n").getBytes(StandardCharsets.UTF_8));
}
if (headers == null || !headers.containsKey("Connection")) {
array.put(("Connection: close\r\n").getBytes(StandardCharsets.UTF_8));
@@ -115,7 +115,6 @@ public class HttpClient {
}
array.put((byte) '\r', (byte) '\n');
if (body != null) array.put(body);
System.out.println(array);
final CompletableFuture<HttpResult<byte[]>> future = new CompletableFuture();
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
@@ -137,7 +136,7 @@ public class HttpClient {
// final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
// asyncGroup.start();
// String url = "http://redkale.org";
// HttpClient client = HttpClient.create(asyncGroup);
// HttpSimpleClient client = HttpSimpleClient.create(asyncGroup);
// System.out.println(client.getAsync(url).join());
// }
@@ -211,7 +210,9 @@ public class HttpClient {
if (buffer.hasRemaining()) array.put(buffer, buffer.remaining());
this.readState = READ_STATE_END;
}
this.responseResult.setResult(array.getBytes());
if (responseResult.getStatus() <= 200) {
this.responseResult.setResult(array.getBytes());
}
this.future.complete(this.responseResult);
conn.offerBuffer(buffer);
conn.dispose();

View File

@@ -321,11 +321,13 @@ public final class Rest {
}
final Map<String, List<String>> asmParamMap = namePresent ? null : MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), webSocketType);
final Set<String> messageNames = new HashSet<>();
final List<Method> messageMethods = new ArrayList<>();
Method wildcardMethod = null;
List<Method> mmethods = new ArrayList<>();
for (Method method : webSocketType.getMethods()) {
RestOnMessage rom = method.getAnnotation(RestOnMessage.class);
if (rom == null) continue;
String name = rom.name();
if (!"*".equals(name) && !checkName(name)) throw new RuntimeException("@RestOnMessage.name contains illegal characters on (" + method + ")");
if (Modifier.isFinal(method.getModifiers())) throw new RuntimeException("@RestOnMessage method can not final but (" + method + ")");
if (Modifier.isStatic(method.getModifiers())) throw new RuntimeException("@RestOnMessage method can not static but (" + method + ")");
if (method.getReturnType() != void.class) throw new RuntimeException("@RestOnMessage method must return void but (" + method + ")");
@@ -333,8 +335,16 @@ public final class Rest {
if (name.isEmpty()) throw new RuntimeException(method + " RestOnMessage.name is empty createRestWebSocketServlet");
if (messageNames.contains(name)) throw new RuntimeException(method + " repeat RestOnMessage.name(" + name + ") createRestWebSocketServlet");
messageNames.add(name);
messageMethods.add(method);
if ("*".equals(name)) {
wildcardMethod = method;
} else {
mmethods.add(method);
}
}
final List<Method> messageMethods = new ArrayList<>();
messageMethods.addAll(mmethods);
//wildcardMethod 必须放最后, _DynRestOnMessageConsumer 是按messageMethods顺序来判断的
if (wildcardMethod != null) messageMethods.add(wildcardMethod);
//----------------------------------------------------------------------------------------
final String resDesc = Type.getDescriptor(Resource.class);
final String wsDesc = Type.getDescriptor(WebSocket.class);
@@ -364,7 +374,8 @@ public final class Rest {
for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List
Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations());
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
msgclassToAnnotations.put(newDynSuperMessageFullName, method.getAnnotations());
}
clz.getField("_redkale_annotations").set(null, msgclassToAnnotations);
if (rws.cryptor() != Cryptor.class) {
@@ -436,7 +447,8 @@ public final class Rest {
for (int i = 0; i < messageMethods.size(); i++) {
Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
cw.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
cw.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
}
}
{ //@Resource
@@ -463,7 +475,7 @@ public final class Rest {
mv.visitMethodInsn(INVOKESPECIAL, supDynName, "<init>", "()V", false);
mv.visitVarInsn(ALOAD, 0);
mv.visitLdcInsn(Type.getObjectType(newDynName + "$" + newDynWebSokcetSimpleName + "Message"));
mv.visitFieldInsn(PUTFIELD, newDynName, "messageTextType", "Ljava/lang/reflect/Type;");
mv.visitFieldInsn(PUTFIELD, newDynName, "messageRestType", "Ljava/lang/reflect/Type;");
mv.visitVarInsn(ALOAD, 0);
MethodDebugVisitor.pushInt(mv, rws.liveinterval());
@@ -526,13 +538,14 @@ public final class Rest {
RestClassLoader newLoader = new RestClassLoader(loader);
Map<String, Annotation[]> msgclassToAnnotations = new HashMap<>();
for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List
Method method = messageMethods.get(i);
final Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations());
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
msgclassToAnnotations.put(newDynSuperMessageFullName, method.getAnnotations());
ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES);
cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynMessageFullName + endfix, null, "java/lang/Object", new String[]{webSocketParamName, "java/lang/Runnable"});
cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynSuperMessageFullName, null, "java/lang/Object", new String[]{webSocketParamName, "java/lang/Runnable"});
cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
Set<String> paramnames = new HashSet<>();
String methodesc = method.getName() + ":" + Type.getMethodDescriptor(method);
List<String> names = asmParamMap == null ? null : asmParamMap.get(methodesc);
@@ -555,6 +568,17 @@ public final class Rest {
param.getType() == param.getParameterizedType() ? null : Utility.getTypeDescriptor(param.getParameterizedType()), null);
fv.visitEnd();
}
if (method == wildcardMethod) {
for (int j = 0; j < messageMethods.size(); j++) {
Method method2 = messageMethods.get(j);
if (method2 == wildcardMethod) continue;
String endfix2 = "_" + method2.getName() + "_" + (j > 9 ? j : ("0" + j));
String newDynSuperMessageFullName2 = newDynMessageFullName + (method2 == wildcardMethod ? "" : endfix2);
cw2.visitInnerClass(newDynSuperMessageFullName2, newDynName, newDynMessageSimpleName + endfix2, ACC_PUBLIC + ACC_STATIC);
fv = cw2.visitField(ACC_PUBLIC, method2.getAnnotation(RestOnMessage.class).name(), "L" + newDynSuperMessageFullName2 + ";", null, null);
fv.visitEnd();
}
}
{ //_redkale_websocket
fv = cw2.visitField(ACC_PUBLIC, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";", null, null);
av0 = fv.visitAnnotation(convertDisabledDesc, true);
@@ -571,6 +595,8 @@ public final class Rest {
}
{ //getNames
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getNames", "()[Ljava/lang/String;", null, null));
av0 = mv.visitAnnotation(convertDisabledDesc, true);
av0.visitEnd();
MethodDebugVisitor.pushInt(mv, paramap.size());
mv.visitTypeInsn(ANEWARRAY, "java/lang/String");
int index = -1;
@@ -594,7 +620,7 @@ public final class Rest {
Label l1 = new Label();
mv.visitJumpInsn(IFEQ, l1);
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName + endfix, en.getKey(), Type.getDescriptor(paramType));
mv.visitFieldInsn(GETFIELD, newDynSuperMessageFullName, en.getKey(), Type.getDescriptor(paramType));
if (paramType.isPrimitive()) {
Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(paramType, 1), 0).getClass();
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(paramType) + ")" + Type.getDescriptor(bigclaz), false);
@@ -609,8 +635,10 @@ public final class Rest {
}
{ //getAnnotations
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getAnnotations", "()[Ljava/lang/annotation/Annotation;", null, null));
av0 = mv.visitAnnotation(convertDisabledDesc, true);
av0.visitEnd();
mv.visitFieldInsn(GETSTATIC, newDynName, "_redkale_annotations", "Ljava/util/Map;");
mv.visitLdcInsn(newDynMessageFullName + endfix);
mv.visitLdcInsn(newDynSuperMessageFullName);
mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get", "(Ljava/lang/Object;)Ljava/lang/Object;", true);
mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;");
mv.visitVarInsn(ASTORE, 1);
@@ -635,7 +663,7 @@ public final class Rest {
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(L" + newDynWebSokcetFullName + ";)V", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitFieldInsn(PUTFIELD, newDynMessageFullName + endfix, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";");
mv.visitFieldInsn(PUTFIELD, newDynSuperMessageFullName, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";");
mv.visitVarInsn(ALOAD, 1);
mv.visitLdcInsn(method.getAnnotation(RestOnMessage.class).name());
mv.visitVarInsn(ALOAD, 0);
@@ -648,11 +676,11 @@ public final class Rest {
{ //run
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "run", "()V", null, null));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName + endfix, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";");
mv.visitFieldInsn(GETFIELD, newDynSuperMessageFullName, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";");
for (Map.Entry<String, Parameter> en : paramap.entrySet()) {
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, (newDynMessageFullName + endfix), en.getKey(), Type.getDescriptor(en.getValue().getType()));
mv.visitFieldInsn(GETFIELD, (newDynSuperMessageFullName), en.getKey(), Type.getDescriptor(en.getValue().getType()));
}
mv.visitMethodInsn(INVOKEVIRTUAL, newDynWebSokcetFullName, method.getName(), Type.getMethodDescriptor(method), false);
@@ -671,11 +699,11 @@ public final class Rest {
}
cw2.visitEnd();
byte[] bytes = cw2.toByteArray();
Class cz = newLoader.loadClass((newDynMessageFullName + endfix).replace('/', '.'), bytes);
RedkaleClassLoader.putDynClass((newDynMessageFullName + endfix).replace('/', '.'), bytes, cz);
Class cz = newLoader.loadClass((newDynSuperMessageFullName).replace('/', '.'), bytes);
RedkaleClassLoader.putDynClass((newDynSuperMessageFullName).replace('/', '.'), bytes, cz);
}
{ //_DynXXXWebSocketMessage class
if (wildcardMethod == null) { //_DynXXXWebSocketMessage class
ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES);
cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynMessageFullName, null, "java/lang/Object", null);
@@ -684,9 +712,10 @@ public final class Rest {
for (int i = 0; i < messageMethods.size(); i++) {
Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
fv = cw2.visitField(ACC_PUBLIC, method.getAnnotation(RestOnMessage.class).name(), "L" + newDynMessageFullName + endfix + ";", null, null);
fv = cw2.visitField(ACC_PUBLIC, method.getAnnotation(RestOnMessage.class).name(), "L" + newDynSuperMessageFullName + ";", null, null);
fv.visitEnd();
}
{ //构造函数
@@ -751,7 +780,8 @@ public final class Rest {
for (int i = 0; i < messageMethods.size(); i++) {
Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
}
{ //构造函数
@@ -776,19 +806,25 @@ public final class Rest {
for (int i = 0; i < messageMethods.size(); i++) {
final Method method = messageMethods.get(i);
String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i));
String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix);
final String messagename = method.getAnnotation(RestOnMessage.class).name();
if (method == wildcardMethod) {
mv.visitVarInsn(ALOAD, 4);
mv.visitVarInsn(ALOAD, 3);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynSuperMessageFullName, "execute", "(L" + newDynWebSokcetFullName + ";)V", false);
} else {
mv.visitVarInsn(ALOAD, 4);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + newDynSuperMessageFullName + ";");
Label ifLabel = new Label();
mv.visitJumpInsn(IFNULL, ifLabel);
mv.visitVarInsn(ALOAD, 4);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + (newDynMessageFullName + endfix) + ";");
Label ifLabel = new Label();
mv.visitJumpInsn(IFNULL, ifLabel);
mv.visitVarInsn(ALOAD, 4);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + (newDynMessageFullName + endfix) + ";");
mv.visitVarInsn(ALOAD, 3);
mv.visitMethodInsn(INVOKEVIRTUAL, (newDynMessageFullName + endfix), "execute", "(L" + newDynWebSokcetFullName + ";)V", false);
mv.visitInsn(RETURN);
mv.visitLabel(ifLabel);
mv.visitVarInsn(ALOAD, 4);
mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + newDynSuperMessageFullName + ";");
mv.visitVarInsn(ALOAD, 3);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynSuperMessageFullName, "execute", "(L" + newDynWebSokcetFullName + ";)V", false);
mv.visitInsn(RETURN);
mv.visitLabel(ifLabel);
}
}
mv.visitInsn(RETURN);
mv.visitMaxs(3, 3 + messageMethods.size());
@@ -2380,7 +2416,7 @@ public final class Rest {
MethodDebugVisitor.pushInt(mv, entry.methodidx);//方法下标
mv.visitInsn(AALOAD);
mv.visitMethodInsn(INVOKESTATIC, retInternalName, "success", "()" + retDesc, false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + typeDesc + retDesc + ")V", false);
mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + typeDesc + "Ljava/lang/Object;)V", false);
mv.visitInsn(RETURN);
} else if (returnType == boolean.class) {
mv.visitVarInsn(ISTORE, maxLocals);
@@ -2905,7 +2941,7 @@ public final class Rest {
return t;
}
private static boolean checkName(String name) { //不能含特殊字符
private static boolean checkName(String name) { //只能是字母、数字和下划线,且不能以数字开头
if (name.isEmpty()) return true;
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false;
for (char ch : name.toCharArray()) {
@@ -2916,7 +2952,7 @@ public final class Rest {
return true;
}
private static boolean checkName2(String name) { //不能含特殊字符
private static boolean checkName2(String name) { //只能是字母、数字、短横、点和下划线,且不能以数字开头
if (name.isEmpty()) return true;
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false;
for (char ch : name.toCharArray()) {

View File

@@ -29,7 +29,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
public @interface RestOnMessage {
/**
* 请求的方法名, 不能含特殊字符,不能以数字开头(能作为变量名)
* 请求的方法名, 不能含特殊字符,不能以数字开头(能作为变量名) <br>
* 值"*" 表示参数中不包含方法名
*
* @return String
*/

View File

@@ -106,7 +106,7 @@ public abstract class WebSocket<G extends Serializable, T> {
Convert _sendConvert; //不可能为空
java.lang.reflect.Type _messageTextType; //不可能为空
java.lang.reflect.Type _messageRestType; //不可能为空
Deflater deflater; //压缩

View File

@@ -0,0 +1,163 @@
package org.redkale.net.http;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import static org.redkale.net.http.WebSocketNode.WS_SOURCE_KEY_USERID_PREFIX;
import org.redkale.service.*;
import org.redkale.util.*;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@AutoLoad(false)
@ResourceType(WebSocketNode.class)
public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public void init(AnyValue conf) {
super.init(conf);
}
@Override
public void destroy(AnyValue conf) {
super.destroy(conf);
}
public final void setName(String name) {
this.name = name;
}
@Override
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return CompletableFuture.completedFuture(rs);
}
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalMessage(message, last, userids);
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
}
@Override
public CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalAction(action, userids);
}
@Override
public CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalAction(action);
}
@Override
public CompletableFuture<Integer> getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (this.localEngine == null) return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
/**
* 当用户连接到节点需要更新到CacheSource
*
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr);
return future;
}
/**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
*
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Integer> future = source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr);
return future.thenApply(v -> null);
}
/**
* 更改用户ID需要更新到CacheSource
*
* @param olduserid Serializable
* @param newuserid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr);
future = future.thenAccept((a) -> source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr);
return future;
}
/**
* 判断用户是否有WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
}
/**
* 强制关闭用户的WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
//不能从sncpNodeAddresses中移除因为engine.forceCloseWebSocket 会调用到disconnect
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
}
}

View File

@@ -297,7 +297,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
try {
Convert convert = webSocket.getTextConvert();
if (restMessageConsumer != null) { //主要供RestWebSocket使用
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageTextType, packet.getPayload()));
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload()));
} else {
webSocket.onMessage(packet.getPayload() == null ? null : new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last);
}
@@ -308,7 +308,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
try {
Convert convert = webSocket.getBinaryConvert();
if (restMessageConsumer != null) { //主要供RestWebSocket使用
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageTextType, packet.getPayload()));
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload()));
} else {
webSocket.onMessage(packet.getPayload(), packet.last);
}

View File

@@ -20,8 +20,7 @@ import javax.annotation.*;
import org.redkale.boot.Application;
import org.redkale.convert.Convert;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor;
import org.redkale.service.*;
import org.redkale.net.*;
import org.redkale.util.*;
/**
@@ -74,7 +73,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer();
protected Type messageTextType; //RestWebSocket时会被修改
protected Type messageRestType; //RestWebSocket时会被修改
//同RestWebSocket.single
protected boolean single = true; //是否单用户单连接
@@ -138,7 +137,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} catch (Exception e) {
logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method");
}
this.messageTextType = msgtype;
this.messageRestType = msgtype;
}
@Override
@@ -214,7 +213,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine;
webSocket._channel = response.getChannel();
webSocket._messageTextType = this.messageTextType;
webSocket._messageRestType = this.messageRestType;
webSocket._textConvert = textConvert;
webSocket._binaryConvert = binaryConvert;
webSocket._sendConvert = sendConvert;
@@ -232,7 +231,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.finish(true);
return;
}
sessionFuture.whenComplete((sessionid, ex) -> {
BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> {
if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
response.finish(true);
@@ -360,6 +359,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.finish(true);
}
});
};
WorkThread workThread = WorkThread.currWorkThread();
sessionFuture.whenComplete((sessionid, ex) -> {
if (workThread == null || workThread == Thread.currentThread()) {
sessionConsumer.accept(sessionid, ex);
} else {
workThread.execute(() -> sessionConsumer.accept(sessionid, ex));
}
});
}

View File

@@ -21,10 +21,21 @@ public abstract class AbstractService implements Service {
@Resource(name = Application.RESNAME_APP_EXECUTOR)
private ExecutorService workExecutor;
/**
* 当前Service类的原始Service类型 由于Service会动态重载所以getClass()得到的不是原始Service类型
*
* @return Class
*/
protected Class serviceType() {
return Sncp.getServiceType(this);
}
/**
* 异步执行任务
*
*
* @param command 任务
*/
protected void runAsync(Runnable command) {
if (workExecutor != null) {
workExecutor.execute(command);
@@ -38,6 +49,12 @@ public abstract class AbstractService implements Service {
}
}
/**
* 异步执行任务
*
* @param hash hash值
* @param command 任务
*/
protected void runAsync(int hash, Runnable command) {
if (workExecutor != null) {
if (workExecutor instanceof ThreadHashExecutor) {
@@ -60,6 +77,11 @@ public abstract class AbstractService implements Service {
}
}
/**
* 获取线程池
*
* @return ExecutorService
*/
protected ExecutorService getExecutor() {
if (workExecutor != null) return workExecutor;
Thread thread = Thread.currentThread();

View File

@@ -5,162 +5,21 @@
*/
package org.redkale.service;
import static org.redkale.net.http.WebSocket.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import org.redkale.net.http.*;
import org.redkale.util.*;
/**
* 由 org.redkale.net.http.WebSocketNodeService 代替
*
* <p>
* 详情见: https://redkale.org
*
* @deprecated 2.6.0
* @author zhangjx
*/
@Deprecated
@AutoLoad(false)
@ResourceType(WebSocketNode.class)
public class WebSocketNodeService extends WebSocketNode implements Service {
public class WebSocketNodeService extends org.redkale.net.http.WebSocketNodeService {
@Override
public void init(AnyValue conf) {
super.init(conf);
}
@Override
public void destroy(AnyValue conf) {
super.destroy(conf);
}
public final void setName(String name) {
this.name = name;
}
@Override
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return CompletableFuture.completedFuture(rs);
}
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalMessage(message, last, userids);
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
}
@Override
public CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalAction(action, userids);
}
@Override
public CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalAction(action);
}
@Override
public CompletableFuture<Integer> getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (this.localEngine == null) return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
/**
* 当用户连接到节点需要更新到CacheSource
*
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr);
return future;
}
/**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
*
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Integer> future = source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr);
return future.thenApply(v -> null);
}
/**
* 更改用户ID需要更新到CacheSource
*
* @param olduserid Serializable
* @param newuserid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr);
future = future.thenAccept((a) -> source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr);
return future;
}
/**
* 判断用户是否有WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
}
/**
* 强制关闭用户的WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
//不能从sncpNodeAddresses中移除因为engine.forceCloseWebSocket 会调用到disconnect
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
}
}

View File

@@ -18,7 +18,7 @@ import org.redkale.util.*;
* DataSource的S抽象实现类 <br>
* 注意: 所有的操作只能作用在一张表上,不能同时变更多张表
*
* <p>
*
* 详情见: https://redkale.org
*
* @author zhangjx
@@ -30,39 +30,121 @@ import org.redkale.util.*;
@ResourceType(DataSource.class)
public abstract class AbstractDataSource extends AbstractService implements DataSource, AutoCloseable, Resourcable {
/**
* 是否虚拟化的持久对象
*
* @param info EntityInfo
*
* @return boolean
*/
protected boolean isOnlyCache(EntityInfo info) {
return info.isVirtualEntity();
}
/**
* 是否可以使用缓存,一般包含关联查询就不使用缓存
*
* @param node 过滤条件
* @param entityApplyer 函数
*
* @return boolean
*/
protected boolean isCacheUseable(FilterNode node, Function<Class, EntityInfo> entityApplyer) {
return node.isCacheUseable(entityApplyer);
}
/**
* 生成过滤函数
*
* @param <T> 泛型
* @param <E> 泛型
* @param node 过滤条件
* @param cache 缓存
*
* @return Predicate
*/
protected <T, E> Predicate<T> createPredicate(FilterNode node, EntityCache<T> cache) {
return node.createPredicate(cache);
}
/**
* 根据ResultSet获取对象
*
* @param <T> 泛型
* @param info EntityInfo
* @param sels 过滤字段
* @param row ResultSet
*
* @return 对象
*/
protected <T> T getEntityValue(EntityInfo<T> info, final SelectColumn sels, final EntityInfo.DataResultSetRow row) {
return sels == null ? info.getFullEntityValue(row) : info.getEntityValue(sels, row);
}
/**
* 根据ResultSet获取对象
*
* @param <T> 泛型
* @param info EntityInfo
* @param constructorAttrs 构造函数字段
* @param unconstructorAttrs 非构造函数字段
* @param row ResultSet
*
* @return 对象
*/
protected <T> T getEntityValue(EntityInfo<T> info, final Attribute<T, Serializable>[] constructorAttrs, final Attribute<T, Serializable>[] unconstructorAttrs, final EntityInfo.DataResultSetRow row) {
return info.getEntityValue(constructorAttrs, unconstructorAttrs, row);
}
/**
* 根据翻页参数构建排序SQL
*
* @param <T> 泛型
* @param info EntityInfo
* @param flipper 翻页参数
*
* @return SQL
*/
protected <T> String createSQLOrderby(EntityInfo<T> info, Flipper flipper) {
return info.createSQLOrderby(flipper);
}
/**
* 根据过滤条件生成关联表与别名的映射关系
*
* @param node 过滤条件
*
* @return Map
*/
protected Map<Class, String> getJoinTabalis(FilterNode node) {
return node == null ? null : node.getJoinTabalis();
}
/**
* 加载指定类的EntityInfo
*
* @param <T> 泛型
* @param clazz 类
* @param cacheForbidden 是否屏蔽缓存
* @param props 配置信息
* @param fullloader 加载器
*
* @return EntityInfo
*/
protected <T> EntityInfo<T> loadEntityInfo(Class<T> clazz, final boolean cacheForbidden, final Properties props, BiFunction<DataSource, EntityInfo, CompletableFuture<List>> fullloader) {
return EntityInfo.load(clazz, cacheForbidden, props, this, fullloader);
}
//检查对象是否都是同一个Entity类
/**
* 检查对象是否都是同一个Entity类
*
* @param <T> 泛型
* @param action 操作
* @param async 是否异步
* @param entitys 对象集合
*
* @return CompletableFuture
*/
protected <T> CompletableFuture checkEntity(String action, boolean async, T... entitys) {
if (entitys.length < 1) return null;
Class clazz = null;
@@ -487,7 +569,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
/**
* 根据指定参数查询对象某个字段的集合
* <p>
*
* @param <T> Entity类的泛型
* @param <V> 字段值的类型
* @param selectedColumn 字段名

View File

@@ -7,6 +7,7 @@ package org.redkale.source;
import java.io.Serializable;
import java.math.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Attribute;
@@ -141,6 +142,14 @@ public interface DataResultSet extends EntityInfo.DataResultSetRow {
o = new BigInteger(o.toString());
}
}
} else if (t == String.class) {
if (o == null) {
o = "";
} else if (o instanceof byte[]) {
o = new String((byte[]) o, StandardCharsets.UTF_8);
} else {
o = o.toString();
}
} else if (o != null && !t.isAssignableFrom(o.getClass()) && o instanceof CharSequence) {
o = ((CharSequence) o).length() == 0 ? null : JsonConvert.root().convertFrom(attr.genericType(), o.toString());
}

View File

@@ -300,17 +300,32 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
}
public static FilterNode create(String column, Serializable value) {
return create(column, null, value);
return filter(column, null, value);
}
public static FilterNode create(String column, FilterExpress express, Serializable value) {
return create(column, express, true, value);
return filter(column, express, true, value);
}
public static FilterNode create(String column, FilterExpress express, boolean itemand, Serializable value) {
return new FilterNode(column, express, itemand, value);
}
//@since 2.6.0 create不利于import static
public static FilterNode filter(String column, Serializable value) {
return filter(column, null, value);
}
//@since 2.6.0 create不利于import static
public static FilterNode filter(String column, FilterExpress express, Serializable value) {
return filter(column, express, true, value);
}
//@since 2.6.0 create不利于import static
public static FilterNode filter(String column, FilterExpress express, boolean itemand, Serializable value) {
return new FilterNode(column, express, itemand, value);
}
private boolean needSplit(final Object val0) {
return needSplit(express, val0);
}

View File

@@ -135,7 +135,13 @@ public abstract class AnyValue {
return rs;
}
//去重
/**
* 合并两个AnyValue对象 会去重, 没有的才增加
*
* @param av AnyValue
*
* @return DefaultAnyValue
*/
public DefaultAnyValue addAllStringSet(final AnyValue av) {
if (av == null) return this;
final Entry<String>[] strings = av.getStringEntrys();
@@ -146,6 +152,13 @@ public abstract class AnyValue {
return this;
}
/**
* 合并两个AnyValue对象 不去重
*
* @param av AnyValue
*
* @return DefaultAnyValue
*/
public DefaultAnyValue addAll(final AnyValue av) {
if (av == null) return this;
if (av instanceof DefaultAnyValue) {
@@ -177,6 +190,13 @@ public abstract class AnyValue {
return this;
}
/**
* 合并两个AnyValue对象 会去重
*
* @param av AnyValue
*
* @return DefaultAnyValue
*/
public DefaultAnyValue setAll(final AnyValue av) {
if (av == null) return this;
if (av instanceof DefaultAnyValue) {
@@ -401,8 +421,16 @@ public abstract class AnyValue {
}
/**
* 字段名和值的组合对象
*
* @param <T> 泛型
*/
public static final class Entry<T> {
/**
* 字段名
*/
public final String name;
T value;
@@ -413,10 +441,20 @@ public abstract class AnyValue {
this.value = value0;
}
/**
* 获取字段名
*
* @return 字段名
*/
public String getName() {
return name;
}
/**
* 获取字段值
*
* @return 字段值
*/
public T getValue() {
return value;
}
@@ -506,34 +544,99 @@ public abstract class AnyValue {
}
}
/**
* 创建DefaultAnyValue
*
* @return DefaultAnyValue
*/
public static DefaultAnyValue create() {
return new DefaultAnyValue();
}
/**
* 文本内容转换成AnyValue对象
*
* @param text 文本内容
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(String text) throws IOException {
return new XmlReader(text).read();
}
/**
* 内容流转换成AnyValue对象
*
* @param in 内容流
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(InputStream in) throws IOException {
return loadFromXml(in, StandardCharsets.UTF_8);
}
/**
* 内容流转换成AnyValue对象
*
* @param in 内容流
* @param charset 字符编码
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(InputStream in, Charset charset) throws IOException {
return new XmlReader(Utility.read(in, charset)).read();
}
/**
* 内容流转换成AnyValue对象
*
* @param text 文本内容
* @param attrFunc 字段回调函数
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(String text, BiFunction<String, String, String> attrFunc) throws IOException {
return new XmlReader(text).attrFunc(attrFunc).read();
}
/**
* 内容流转换成AnyValue对象
*
* @param in 内容流
* @param attrFunc 字段回调函数
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(InputStream in, BiFunction<String, String, String> attrFunc) throws IOException {
return loadFromXml(in, StandardCharsets.UTF_8, attrFunc);
}
/**
* 内容流转换成AnyValue对象
*
* @param in 内容流
* @param charset 字符编码
* @param attrFunc 字段回调函数
*
* @return AnyValue
* @throws IOException 异常
*/
public static AnyValue loadFromXml(InputStream in, Charset charset, BiFunction<String, String, String> attrFunc) throws IOException {
return new XmlReader(Utility.read(in, charset)).attrFunc(attrFunc).read();
}
/**
* 当前AnyValue对象字符串化
*
* @param indent 缩进长度
*
* @return String
*/
public String toString(int indent) { //indent: 缩进长度
if (indent < 0) indent = 0;
char[] chars = new char[indent];
@@ -551,43 +654,148 @@ public abstract class AnyValue {
return sb.toString();
}
/**
* 回调子节点
*
* @param stringConsumer 字符串字段的回调函数
*/
public abstract void forEach(BiConsumer<String, String> stringConsumer);
/**
* 回调子节点
*
* @param stringConsumer 字符串字段的回调函数
* @param anyConsumer 字符串对象的回调函数
*/
public abstract void forEach(BiConsumer<String, String> stringConsumer, BiConsumer<String, AnyValue> anyConsumer);
/**
* 获取所有字符串子节点
*
* @return Entry[]
*/
public abstract Entry<String>[] getStringEntrys();
/**
* 获取所有复合子节点
*
* @return Entry[]
*/
public abstract Entry<AnyValue>[] getAnyEntrys();
/**
* 获取字段名集合
*
* @return String[]
*/
public abstract String[] getNames();
/**
* 获取同级下同一字段名下所有的String对象
*
* @param name 字段名
*
* @return String[]
*/
public abstract String[] getValues(String name);
/**
* 根据字段名集合获取String类型的字段值集合
*
* @param names 字段名集合
*
* @return String[]
*/
public abstract String[] getValues(String... names);
/**
* 获取同级下同一字段名下所有的AnyValue对象
*
* @param name 字段名
*
* @return AnyValue[]
*/
public abstract AnyValue[] getAnyValues(String name);
/**
* 根据字段名集合获取AnyValue类型的字段值集合
*
* @param names 字段名集合
*
* @return AnyValue[]
*/
public abstract AnyValue[] getAnyValues(String... names);
/**
* 根据字段名获取AnyValue类型的字段值
*
* @param name 字段名
*
* @return AnyValue
*/
public abstract AnyValue getAnyValue(String name);
/**
* 根据字段名获取String类型的字段值
*
* @param name 字段名
*
* @return String
*/
public abstract String getValue(String name);
/**
* 根据字段名获取String类型的字段值
*
* @param name 字段名
*
* @return String
*/
public abstract String get(String name);
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public boolean getBoolValue(String name) {
return Boolean.parseBoolean(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public boolean getBoolValue(String name, boolean defaultValue) {
String value = getValue(name);
return value == null || value.length() == 0 ? defaultValue : Boolean.parseBoolean(value);
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public byte getByteValue(String name) {
return Byte.parseByte(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public byte getByteValue(String name, byte defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -598,6 +806,15 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param radix 进制,默认十进制
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public byte getByteValue(int radix, String name, byte defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -608,19 +825,49 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public char getCharValue(String name) {
return getValue(name).charAt(0);
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public char getCharValue(String name, char defaultValue) {
String value = getValue(name);
return value == null || value.length() == 0 ? defaultValue : value.charAt(0);
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return String
*/
public short getShortValue(String name) {
return Short.decode(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public short getShortValue(String name, short defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -631,6 +878,15 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param radix 进制,默认十进制
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public short getShortValue(int radix, String name, short defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -641,10 +897,25 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public int getIntValue(String name) {
return Integer.decode(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return String
*/
public int getIntValue(String name, int defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -655,6 +926,15 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param radix 进制,默认十进制
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public int getIntValue(int radix, String name, int defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -665,10 +945,25 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public long getLongValue(String name) {
return Long.decode(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public long getLongValue(String name, long defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -679,6 +974,15 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param radix 进制,默认十进制
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public long getLongValue(int radix, String name, long defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -689,10 +993,25 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return String
*/
public float getFloatValue(String name) {
return Float.parseFloat(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public float getFloatValue(String name, float defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -703,10 +1022,25 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
*
* @return 字段值
*/
public double getDoubleValue(String name) {
return Double.parseDouble(getValue(name));
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public double getDoubleValue(String name, double defaultValue) {
String value = getValue(name);
if (value == null || value.length() == 0) return defaultValue;
@@ -717,11 +1051,27 @@ public abstract class AnyValue {
}
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public String getValue(String name, String defaultValue) {
String value = getValue(name);
return value == null ? defaultValue : value;
}
/**
* 获取字段值
*
* @param name 字段名
* @param defaultValue 默认值
*
* @return 字段值
*/
public String getOrDefault(String name, String defaultValue) {
String value = getValue(name);
return value == null ? defaultValue : value;
@@ -753,10 +1103,27 @@ public abstract class AnyValue {
return hash;
}
/**
* xml化当前AnyValue对象
*
* @param rootName root名称
*
* @return String
*/
public String toXML(String rootName) {
return toXMLString(new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n\r\n"), rootName, this, 0).toString();
}
/**
* xml化AnyValue对象
*
* @param sb StringBuilder
* @param nodeName 字段名
* @param conf AnyValue
* @param indent 缩进长度
*
* @return StringBuilder
*/
protected static StringBuilder toXMLString(StringBuilder sb, String nodeName, AnyValue conf, int indent) { //indent: 缩进长度
if (indent < 0) indent = 0;
char[] chars = new char[indent];

View File

@@ -763,8 +763,13 @@ public interface Attribute<T, F> {
final String columnName = column.getName().replace('.', '/');
final String interDesc = Type.getDescriptor(TypeToken.typeToClass(subclass));
final String columnDesc = Type.getDescriptor(column);
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class realclz = TypeToken.typeToClass(subclass);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
loader.loadClass(realclz.getName());
} catch (ClassNotFoundException e) {
loader = realclz.getClassLoader();
}
String pkgname = "";
String clzname = newsubname.toString();
if (realclz != null) {

View File

@@ -177,6 +177,7 @@ public class MpscChunkedArrayQueue<E> extends AbstractQueue<E> {
/**
* An ordered store of an element to a given offset
*
* @param <E> E
* @param buffer this.buffer
* @param offset computed via
* @param e an orderly kitty

View File

@@ -248,11 +248,13 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
this(MIN_SIZE);
}
/** Create a new NonBlockingHashMap with initial room for the given number of
/** * Create a new NonBlockingHashMap with initial room for the given number of
* elements, thus avoiding internal resizing operations to reach an
* appropriate size. Large numbers here when used with a small count of
* appropriate size.Large numbers here when used with a small count of
* elements will sacrifice space for a small amount of time gained. The
* initial size will be rounded up internally to the next larger power of 2. */
* initial size will be rounded up internally to the next larger power of 2.
* @param initial_sz int
*/
public NonBlockingHashMap(final int initial_sz) {
initialize(initial_sz);
}
@@ -399,10 +401,13 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
return res == TOMBSTONE ? null : res;
}
/** Atomically replace newVal for oldVal, returning the value that existed
* there before. If the oldVal matches the returned value, then newVal was
/** * Atomically replace newVal for oldVal, returning the value that existed
* there before.If the oldVal matches the returned value, then newVal was
* inserted, otherwise not.
*
* @param key key
* @param newVal newVal
* @param oldVal oldVal
* @return the previous value associated with the specified key,
* or &#60;tt&#62;null&#60;/tt&#62; if there was no mapping for the key
* @throws NullPointerException if the key or either value is null
@@ -544,7 +549,7 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
}
// --- get -----------------------------------------------------------------
/** Returns the value to which the specified key is mapped, or {@code null}
/** * Returns the value to which the specified key is mapped, or {@code null}
* if this map contains no mapping for the key.
* <p>
* More formally, if this map contains a mapping from a key {@code k} to
@@ -552,6 +557,8 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
* returns {@code v}; otherwise it returns {@code null}. (There can be at
* most one such mapping.)
*
* @param key key
* @return Type
* @throws NullPointerException if the specified key is null */
// Never returns a Prime nor a Tombstone.
@Override
@@ -615,6 +622,8 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
/** Returns the Key to which the specified key is mapped, or {@code null}
* if this map contains no mapping for the key.
*
* @param key TypeK
* @return TypeK
* @throws NullPointerException if the specified key is null */
// Never returns a Prime nor a Tombstone.
public TypeK getk(TypeK key) {
@@ -1690,11 +1699,11 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
// --- public interface ---
/**
* Add the given value to current counter value. Concurrent updates will
* Add the given value to current counter value.Concurrent updates will
* not be lost, but addAndGet or getAndAdd are not implemented because the
* total counter value (i.e., {@link #get}) is not atomically updated.
* Updates are striped across an array of counters to avoid cache contention
* total counter value (i.e., {@link #get}) is not atomically updated. Updates are striped across an array of counters to avoid cache contention
* and has been tested with performance scaling linearly up to 768 CPUs.
* @param x long
*/
public void add(long x) {
add_if(x);
@@ -1710,8 +1719,8 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
add_if(1L);
}
/** Atomically set the sum of the striped counters to specified value.
* Rather more expensive than a simple store, in order to remain atomic.
/** * Atomically set the sum of the striped counters to specified value.Rather more expensive than a simple store, in order to remain atomic.
* @param x long
*/
public void set(long x) {
CAT newcat = new CAT(null, 4, x);
@@ -1721,27 +1730,30 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
}
/**
* Current value of the counter. Since other threads are updating furiously
* Current value of the counter.Since other threads are updating furiously
* the value is only approximate, but it includes all counts made by the
* current thread. Requires a pass over the internally striped counters.
* @return long
*/
public long get() {
return _cat.sum();
}
/** Same as {@link #get}, included for completeness. */
/** Same as {@link #get}, included for completeness.
* @return int */
public int intValue() {
return (int) _cat.sum();
}
/** Same as {@link #get}, included for completeness. */
/** Same as {@link #get}, included for completeness.
* @return long */
public long longValue() {
return _cat.sum();
}
/**
* A cheaper {@link #get}. Updated only once/millisecond, but as fast as a
* simple load instruction when not updating.
* A cheaper {@link #get}.Updated only once/millisecond, but as fast as a simple load instruction when not updating.
* @return long
*/
public long estimate_get() {
return _cat.estimate_sum();
@@ -1749,7 +1761,9 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
/**
* Return the counter's {@code long} value converted to a string.
* @return String
*/
@Override
public String toString() {
return _cat.toString();
}
@@ -1763,8 +1777,8 @@ public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV>
}
/**
* Return the internal counter striping factor. Useful for diagnosing
* performance problems.
* Return the internal counter striping factor.Useful for diagnosing performance problems.
* @return int
*/
public int internal_size() {
return _cat._t.length;

View File

@@ -16,7 +16,7 @@ public final class Redkale {
private static final String rootPackage = "org.redkale";
private Redkale() {
private Redkale() {
}
public static String getRootPackage() {
@@ -24,7 +24,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "2.5.0";
return "2.6.0";
}
public static int getMajorVersion() {
@@ -32,6 +32,6 @@ public final class Redkale {
}
public static int getMinorVersion() {
return 5;
return 6;
}
}

View File

@@ -0,0 +1,57 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.test.convert;
import java.util.*;
import org.junit.jupiter.api.Test;
import org.redkale.convert.json.JsonConvert;
/**
*
* @author zhangjx
*/
public class DyncJsonTest {
public static void main(String[] args) throws Throwable {
new DyncJsonTest().run();
}
@Test
public void run() throws Exception {
SimpleDyncBean bean = new SimpleDyncBean();
bean.name = "haha";
System.out.println(JsonConvert.root().convertTo(bean));
SimpleDyncBean2 bean2 = new SimpleDyncBean2();
bean2.name = "haha";
System.out.println(JsonConvert.root().convertTo(bean2));
SimpleDyncBean3 bean3 = new SimpleDyncBean3();
bean3.name = "haha";
System.out.println(JsonConvert.root().convertTo(bean3));
}
public static class SimpleDyncBean {
public String name;
public List<SimpleDyncBean> beans;
}
public static class SimpleDyncBean2 {
public String name;
public SimpleDyncBean2 bean2;
}
public static class SimpleDyncBean3 {
public String name;
public Map<String, SimpleDyncBean3> beanmap;
}
}

View File

@@ -46,4 +46,10 @@ public class ChatService implements Service {
public void chatMessage(ChatMessage message) {
wsnode.broadcastMessage(message);
}
@Comment("其他操作")
public void other(int roomid, String name) {
System.out.println("其他操作: roomid: " + roomid + ", name: " + name);
}
}

View File

@@ -91,4 +91,21 @@ public class ChatWebSocket extends WebSocket<Integer, Object> {
service.joinRoom(getUserid(), roomid);
}
/**
* 浏览器WebSocket请求
* <pre>
* websocket.send(JSON.stringify({
* roomid: 10212
* name: "haha"
* }));
* </pre>
*
* @param roomid 参数1
* @param name 参数2
*/
@RestOnMessage(name = "*") //*为特殊值表示参数中不包含方法名
public void other(int roomid, String name) {
service.other(roomid, name);
}
}

View File

@@ -31,7 +31,7 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
public _DyncChatWebSocketServlet() {
super();
this.messageTextType = _DyncChatWebSocketMessage.class;
this.messageRestType = _DyncChatWebSocketMessage.class;
}
@Override
@@ -52,12 +52,48 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
}
}
public static class _DyncChatWebSocketMessage {
public static class _DyncChatWebSocketMessage implements WebSocketParam, Runnable {
public _DyncChatWebSocketMessage_sendmessagee_00 sendmessage;
public _DyncChatWebSocketMessage_joinroom_01 joinroom;
@ConvertDisabled
public _DyncChatWebSocket _redkale_websocket;
public int roomid;
public String name;
@Override
public String[] getNames() {
return new String[]{"roomid", "name"};
}
@Override
public <T> T getValue(String name) {
if ("roomid".equals(name)) return (T) (Integer) roomid;
if ("name".equals(name)) return (T) (String) name;
return null;
}
@Override
public Annotation[] getAnnotations() {
Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage");
if (annotations == null) return new Annotation[0];
return Arrays.copyOf(annotations, annotations.length);
}
public void execute(_DyncChatWebSocket websocket) {
this._redkale_websocket = websocket;
websocket.preOnMessage("*", this, this);
}
@Override
public void run() {
_redkale_websocket.other(this.roomid, this.name);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
@@ -163,6 +199,7 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet {
message.joinroom.execute(websocket);
return;
}
message.execute(websocket);
}
}