diff --git a/pom.xml b/pom.xml index 8ddb83b8a..fb344002c 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ RedkaleProject https://redkale.org redkale -- java framework - 2.5.0 + 2.6.0 UTF-8 diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index f212df1fa..6376a6179 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -11,6 +11,7 @@ import org.redkale.net.TransportGroupInfo; import java.io.*; import java.lang.reflect.*; import java.net.*; +import java.net.http.HttpClient; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.StandardCharsets; @@ -509,7 +510,8 @@ public final class Application { } ExecutorService workExecutor0 = null; - if (executorConf != null) { + { + if (executorConf == null) executorConf = DefaultAnyValue.create(); final AtomicReference workref = new AtomicReference<>(); final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); boolean executorHash = executorConf.getBoolValue("hash"); @@ -804,18 +806,36 @@ public final class Application { }, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); - //------------------------------------- 注册 HttpClient -------------------------------------------------------- + //------------------------------------- 注册 java.net.http.HttpClient -------------------------------------------------------- resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null) return; - HttpClient httpClient = HttpClient.create(asyncGroup); + java.net.http.HttpClient.Builder builder = java.net.http.HttpClient.newBuilder(); + if (resourceName.endsWith(".1.1")) { + builder.version(HttpClient.Version.HTTP_1_1); + } else if (resourceName.endsWith(".2")) { + builder.version(HttpClient.Version.HTTP_2); + } + java.net.http.HttpClient httpClient = builder.build(); field.set(src, httpClient); rf.inject(httpClient, null); // 给其可能包含@Resource的字段赋值; - rf.register(resourceName, HttpClient.class, httpClient); + rf.register(resourceName, java.net.http.HttpClient.class, httpClient); + } catch (Exception e) { + logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] java.net.http.HttpClient inject error", e); + } + }, java.net.http.HttpClient.class); + //------------------------------------- 注册 HttpSimpleClient -------------------------------------------------------- + resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> { + try { + if (field.getAnnotation(Resource.class) == null) return; + HttpSimpleClient httpClient = HttpSimpleClient.create(asyncGroup); + field.set(src, httpClient); + rf.inject(httpClient, null); // 给其可能包含@Resource的字段赋值; + rf.register(resourceName, HttpSimpleClient.class, httpClient); } catch (Exception e) { logger.log(Level.SEVERE, "[" + Thread.currentThread().getName() + "] HttpClient inject error", e); } - }, HttpClient.class); + }, HttpSimpleClient.class); //-------------------------------------------------------------------------- if (this.asyncGroup != null) { ((AsyncIOGroup) this.asyncGroup).start(); @@ -857,7 +877,7 @@ public final class Application { rf.register(resourceName, HttpMessageClient.class, messageClient); return; } - HttpMessageClient messageClient = new HttpMessageClusterClient(clusterAgent); + HttpMessageClient messageClient = new HttpMessageClusterClient(application, resourceName, clusterAgent); field.set(src, messageClient); rf.inject(messageClient, null); // 给其可能包含@Resource的字段赋值; rf.register(resourceName, HttpMessageClient.class, messageClient); @@ -1066,6 +1086,11 @@ public final class Application { } } + /** + * 启动 + * + * @throws Exception 异常 + */ public void start() throws Exception { if (!singletonMode && !compileMode && this.clusterAgent != null) { this.clusterAgent.register(this); @@ -1248,10 +1273,31 @@ public final class Application { sercdl.await(); } + /** + * 实例化单个Service + * + * @param 泛型 + * @param serviceClass 指定的service类 + * @param extServiceClasses 需要排除的service类 + * + * @return Service对象 + * @throws Exception 异常 + */ public static T singleton(Class serviceClass, Class... extServiceClasses) throws Exception { return singleton("", serviceClass, extServiceClasses); } + /** + * 实例化单个Service + * + * @param 泛型 + * @param name Service的资源名 + * @param serviceClass 指定的service类 + * @param extServiceClasses 需要排除的service类 + * + * @return Service对象 + * @throws Exception 异常 + */ public static T singleton(String name, Class serviceClass, Class... extServiceClasses) throws Exception { if (serviceClass == null) throw new IllegalArgumentException("serviceClass is null"); final Application application = Application.create(true); @@ -1279,6 +1325,11 @@ public final class Application { return new Application(singleton, false, loadAppConfig()); } + /** + * 重新加载配置信息 + * + * @throws IOException 异常 + */ public void reloadConfig() throws IOException { AnyValue newconfig = loadAppConfig(); final String confpath = this.confPath.toString(); diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index 27f547d1b..f7778bdae 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -131,7 +131,7 @@ public class NodeHttpServer extends NodeServer { } catch (Exception ex) { logger.log(Level.WARNING, "WebSocketServlet getMessageAgent error", ex); } - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, org.redkale.net.http.WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); } resourceFactory.inject(nodeService, self); diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index e413dcbd7..6646f4d25 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -438,7 +438,7 @@ public abstract class NodeServer { rf.inject(source, self); // if (!application.isCompileMode() && source instanceof Service) ((Service) source).init(sourceConf); - if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource + if ((src instanceof org.redkale.net.http.WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); if (source != null && source.getClass().getAnnotation(Local.class) == null) { //本地模式的Service不生成SncpServlet sncpServer.getSncpServer().addSncpServlet((Service) source); @@ -468,9 +468,9 @@ public abstract class NodeServer { if (nodeService == null) { final HashSet groups = new HashSet<>(); if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup); - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, org.redkale.net.http.WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); (isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService); - ((WebSocketNodeService) nodeService).setName(resourceName); + ((org.redkale.net.http.WebSocketNodeService) nodeService).setName(resourceName); } resourceFactory.inject(nodeService, self); MessageAgent messageAgent = Sncp.getMessageAgent((Service) src); @@ -551,8 +551,8 @@ public abstract class NodeServer { } else { service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } - if (service instanceof WebSocketNodeService) { - ((WebSocketNodeService) service).setName(resourceName); + if (service instanceof org.redkale.net.http.WebSocketNodeService) { + ((org.redkale.net.http.WebSocketNodeService) service).setName(resourceName); if (agent != null) Sncp.setMessageAgent(service, agent); } final Class restype = Sncp.getResourceType(service); diff --git a/src/main/java/org/redkale/boot/watch/AbstractWatchService.java b/src/main/java/org/redkale/boot/watch/AbstractWatchService.java index 29d385c2e..26a3482f4 100644 --- a/src/main/java/org/redkale/boot/watch/AbstractWatchService.java +++ b/src/main/java/org/redkale/boot/watch/AbstractWatchService.java @@ -15,9 +15,15 @@ import org.redkale.watch.WatchService; */ public abstract class AbstractWatchService extends AbstractService implements WatchService { + /** + * 缺少参数 + */ @Comment("缺少参数") public static final int RET_WATCH_PARAMS_ILLEGAL = 1600_0001; + /** + * 执行异常 + */ @Comment("执行异常") public static final int RET_WATCH_RUN_EXCEPTION = 1600_0002; } diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 9dcc2bfb9..4a158f75b 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -112,7 +112,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { updateSncpTransport(entry); }); } catch (Exception e) { - logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e); + logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e); } }, Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS); } diff --git a/src/main/java/org/redkale/convert/AnyDecoder.java b/src/main/java/org/redkale/convert/AnyDecoder.java index b270d86a7..5b9fc7d6b 100644 --- a/src/main/java/org/redkale/convert/AnyDecoder.java +++ b/src/main/java/org/redkale/convert/AnyDecoder.java @@ -14,7 +14,7 @@ import static org.redkale.convert.Reader.ValueType.MAP; /** * 对不明类型的对象进行反序列化。
* 注意: 目前只支持文本格式
- *

+ * * 详情见: https://redkale.org * * @author zhangjx @@ -31,12 +31,17 @@ public class AnyDecoder implements Decodeable { private static final Creator mapCreator = Creator.create(HashMap.class); - protected final Decodeable stringDecoder; + final Decodeable stringDecoder; - protected final CollectionDecoder collectionDecoder; + final CollectionDecoder collectionDecoder; - protected final MapDecoder mapDecoder; + final MapDecoder mapDecoder; + /** + * 构造函数 + * + * @param factory ConvertFactory + */ public AnyDecoder(final ConvertFactory factory) { this.stringDecoder = factory.loadDecoder(String.class); this.collectionDecoder = new CollectionDecoder(factory, collectionObjectType, Object.class, collectionCreator, this); diff --git a/src/main/java/org/redkale/convert/AnyEncoder.java b/src/main/java/org/redkale/convert/AnyEncoder.java index 310ce4633..b241f4502 100644 --- a/src/main/java/org/redkale/convert/AnyEncoder.java +++ b/src/main/java/org/redkale/convert/AnyEncoder.java @@ -6,11 +6,10 @@ package org.redkale.convert; import java.lang.reflect.Type; -import java.util.concurrent.CompletableFuture; /** * 对不明类型的对象进行序列化; BSON序列化时将对象的类名写入Writer,JSON则不写入。 - *

+ * * 详情见: https://redkale.org * * @author zhangjx @@ -42,29 +41,6 @@ public final class AnyEncoder implements Encodeable { } } - @SuppressWarnings("unchecked") - public void convertMapTo(final Writer out, final Object... values) { - if (values == null) { - out.writeNull(); - } else { - int count = values.length - values.length % 2; - if (out.writeMapB(count / 2, (Encodeable) this, (Encodeable) this, values) < 0) { - for (int i = 0; i < count; i += 2) { - if (i > 0) out.writeArrayMark(); - this.convertTo(out, (T) values[i]); - out.writeMapMark(); - Object val = values[i + 1]; - if (val instanceof CompletableFuture) { - this.convertTo(out, (T) ((CompletableFuture) val).join()); - } else { - this.convertTo(out, (T) val); - } - } - } - out.writeMapE(); - } - } - @Override public Type getType() { return Object.class; diff --git a/src/main/java/org/redkale/convert/DeMember.java b/src/main/java/org/redkale/convert/DeMember.java index 57beaf003..96fede608 100644 --- a/src/main/java/org/redkale/convert/DeMember.java +++ b/src/main/java/org/redkale/convert/DeMember.java @@ -6,7 +6,9 @@ package org.redkale.convert; import java.lang.reflect.*; -import org.redkale.util.Attribute; +import javax.persistence.Column; +import org.redkale.source.FilterColumn; +import org.redkale.util.*; /** * 字段的反序列化操作类 @@ -26,6 +28,8 @@ public final class DeMember { final Method method; //对应类成员的Method也可能为null + final String comment; + protected int index; protected int position; //从1开始 @@ -40,6 +44,43 @@ public final class DeMember { this.attribute = attribute; this.field = field; this.method = method; + if (field != null) { + Comment ct = field.getAnnotation(Comment.class); + if (ct == null) { + Column col = field.getAnnotation(Column.class); + if (col == null) { + FilterColumn fc = field.getAnnotation(FilterColumn.class); + if (fc == null) { + this.comment = ""; + } else { + this.comment = fc.comment(); + } + } else { + this.comment = col.comment(); + } + } else { + this.comment = ct.value(); + } + } else if (method != null) { + Comment ct = method.getAnnotation(Comment.class); + if (ct == null) { + Column col = method.getAnnotation(Column.class); + if (col == null) { + FilterColumn fc = method.getAnnotation(FilterColumn.class); + if (fc == null) { + this.comment = ""; + } else { + this.comment = fc.comment(); + } + } else { + this.comment = col.comment(); + } + } else { + this.comment = ct.value(); + } + } else { + this.comment = ""; + } } public DeMember(Attribute attribute, Decodeable decoder, Field field, Method method) { @@ -85,6 +126,10 @@ public final class DeMember { return this.attribute; } + public String getComment() { + return comment; + } + public Decodeable getDecoder() { return decoder; } diff --git a/src/main/java/org/redkale/convert/Decodeable.java b/src/main/java/org/redkale/convert/Decodeable.java index 98e9b933f..be197771c 100644 --- a/src/main/java/org/redkale/convert/Decodeable.java +++ b/src/main/java/org/redkale/convert/Decodeable.java @@ -19,6 +19,13 @@ import java.lang.reflect.Type; */ public interface Decodeable { + /** + * 反序列化操作 + * + * @param in R + * + * @return T + */ public T convertFrom(final R in); /** diff --git a/src/main/java/org/redkale/convert/EnMember.java b/src/main/java/org/redkale/convert/EnMember.java index ee038ad6f..cca2243e3 100644 --- a/src/main/java/org/redkale/convert/EnMember.java +++ b/src/main/java/org/redkale/convert/EnMember.java @@ -6,7 +6,9 @@ package org.redkale.convert; import java.lang.reflect.*; -import org.redkale.util.Attribute; +import javax.persistence.Column; +import org.redkale.source.FilterColumn; +import org.redkale.util.*; /** * 字段的序列化操作类 @@ -26,6 +28,8 @@ public final class EnMember { final Encodeable encoder; + final String comment; + final boolean string; //final boolean isnumber; @@ -39,11 +43,11 @@ public final class EnMember { final Method method; //对应类成员的Method也可能为null - protected int index; + int index; - protected int position; //从1开始 + int position; //从1开始 - protected int tag; //主要给protobuf使用 + int tag; //主要给protobuf使用 public EnMember(Attribute attribute, Encodeable encoder, Field field, Method method) { this.attribute = attribute; @@ -55,6 +59,43 @@ public final class EnMember { this.bool = t == Boolean.class || t == boolean.class; this.jsonFieldNameChars = ('"' + attribute.field() + "\":").toCharArray(); this.jsonFieldNameBytes = ('"' + attribute.field() + "\":").getBytes(); + if (field != null) { + Comment ct = field.getAnnotation(Comment.class); + if (ct == null) { + Column col = field.getAnnotation(Column.class); + if (col == null) { + FilterColumn fc = field.getAnnotation(FilterColumn.class); + if (fc == null) { + this.comment = ""; + } else { + this.comment = fc.comment(); + } + } else { + this.comment = col.comment(); + } + } else { + this.comment = ct.value(); + } + } else if (method != null) { + Comment ct = method.getAnnotation(Comment.class); + if (ct == null) { + Column col = method.getAnnotation(Column.class); + if (col == null) { + FilterColumn fc = method.getAnnotation(FilterColumn.class); + if (fc == null) { + this.comment = ""; + } else { + this.comment = fc.comment(); + } + } else { + this.comment = col.comment(); + } + } else { + this.comment = ct.value(); + } + } else { + this.comment = ""; + } //this.isnumber = Number.class.isAssignableFrom(t) || (!this.isbool && t.isPrimitive()); } @@ -88,6 +129,10 @@ public final class EnMember { return attribute; } + public String getComment() { + return comment; + } + public char[] getJsonFieldNameChars() { return jsonFieldNameChars; } diff --git a/src/main/java/org/redkale/convert/Encodeable.java b/src/main/java/org/redkale/convert/Encodeable.java index 136e1458c..06d60a4c3 100644 --- a/src/main/java/org/redkale/convert/Encodeable.java +++ b/src/main/java/org/redkale/convert/Encodeable.java @@ -19,6 +19,12 @@ import java.lang.reflect.Type; */ public interface Encodeable { + /** + * 序列化操作 + * + * @param out Writer + * @param value 对象 + */ public void convertTo(final W out, T value); /** diff --git a/src/main/java/org/redkale/convert/json/JsonDynEncoder.java b/src/main/java/org/redkale/convert/json/JsonDynEncoder.java index 157c1c256..9b1ce17d6 100644 --- a/src/main/java/org/redkale/convert/json/JsonDynEncoder.java +++ b/src/main/java/org/redkale/convert/json/JsonDynEncoder.java @@ -44,7 +44,7 @@ public abstract class JsonDynEncoder implements Encodeable { return false; } - private static boolean checkMemberType(final JsonFactory factory, Type type, Class clazz) { + private static boolean checkMemberType(final JsonFactory factory, final Class declaringClass, Type type, Class clazz) { if (type == String.class) return true; if (clazz.isPrimitive()) return true; if (clazz.isEnum()) return true; @@ -65,6 +65,8 @@ public abstract class JsonDynEncoder implements Encodeable { if (type == Long[].class) return true; if (type == Double[].class) return true; if (type == String[].class) return true; + + if (declaringClass == clazz) return false; if (Collection.class.isAssignableFrom(clazz) && type instanceof ParameterizedType) { Type[] ts = ((ParameterizedType) type).getActualTypeArguments(); if (ts.length == 1) { @@ -72,15 +74,9 @@ public abstract class JsonDynEncoder implements Encodeable { if (t == Boolean.class || t == Byte.class || t == Short.class || t == Character.class || t == Integer.class || t == Float.class || t == Long.class || t == Double.class || t == String.class || ((t instanceof Class) && ((Class) t).isEnum())) return true; - if (factory.loadEncoder(t) instanceof JsonDynEncoder) return true; + return false; } } - if (type instanceof TypeVariable) return false; - try { - if (factory.loadEncoder(type) instanceof JsonDynEncoder) return true; - } catch (Exception e) { - return false; - } return false; } @@ -119,7 +115,7 @@ public abstract class JsonDynEncoder implements Encodeable { if (factory.isConvertDisabled(field)) continue; ref = factory.findRef(clazz, field); if (ref != null && ref.ignore()) continue; - if (!(checkMemberType(factory, field.getGenericType(), field.getType()))) return null; + if (!(checkMemberType(factory, clazz, field.getGenericType(), field.getType()))) return null; String name = convertFieldName(factory, clazz, field); if (names.contains(name)) continue; names.add(name); @@ -139,7 +135,7 @@ public abstract class JsonDynEncoder implements Encodeable { if (method.getReturnType() == void.class) continue; ref = factory.findRef(clazz, method); if (ref != null && ref.ignore()) continue; - if (!(checkMemberType(factory, method.getGenericReturnType(), method.getReturnType()))) return null; + if (!(checkMemberType(factory, clazz, method.getGenericReturnType(), method.getReturnType()))) return null; String name = convertFieldName(factory, clazz, method); if (names.contains(name)) continue; names.add(name); @@ -404,8 +400,8 @@ public abstract class JsonDynEncoder implements Encodeable { } else { mv.visitMethodInsn(INVOKEVIRTUAL, valtypeName, ((Method) element1).getName(), "()" + org.redkale.asm.Type.getDescriptor(fieldtype1), false); } - maxLocals ++; - + maxLocals++; + mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, fieldname2 + "CommaFieldBytes", "[B"); @@ -415,10 +411,10 @@ public abstract class JsonDynEncoder implements Encodeable { } else { mv.visitMethodInsn(INVOKEVIRTUAL, valtypeName, ((Method) element2).getName(), "()" + org.redkale.asm.Type.getDescriptor(fieldtype2), false); } - maxLocals ++; - + maxLocals++; + mv.visitMethodInsn(INVOKEVIRTUAL, writerName, "writeObjectByOnlyTwoIntFieldValue", "([BI[BI)V", false); - + } else if (onlyShotIntLongLatin1MoreFieldObjectFlag && mustHadComma) { for (AccessibleObject element : members) { elementIndex++; diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java index d87bbc5ff..bd03e57bc 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java @@ -8,11 +8,13 @@ package org.redkale.mq; import java.io.Serializable; import java.net.*; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.Level; import javax.annotation.Resource; +import org.redkale.boot.Application; import org.redkale.cluster.ClusterAgent; import org.redkale.net.http.*; import org.redkale.util.Utility; @@ -33,27 +35,44 @@ public class HttpMessageClusterClient extends HttpMessageClient { private static final Set DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length", "date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning"); + protected final HttpMessageLocalClient localClient; + + protected final ConcurrentHashMap topicServletMap = new ConcurrentHashMap<>(); + protected ClusterAgent clusterAgent; @Resource(name = "cluster.httpClient") - protected HttpClient httpClient; + protected java.net.http.HttpClient httpClient; - //protected java.net.http.HttpClient httpClient; - public HttpMessageClusterClient(ClusterAgent clusterAgent) { + @Resource(name = "cluster.httpClient") + protected HttpSimpleClient httpSimpleClient; + + public HttpMessageClusterClient(Application application, String resourceName, ClusterAgent clusterAgent) { super(null); Objects.requireNonNull(clusterAgent); + this.localClient = new HttpMessageLocalClient(application, resourceName); this.clusterAgent = clusterAgent; - //this.httpClient = java.net.http.HttpClient.newHttpClient(); + this.finest = logger.isLoggable(Level.FINEST); + this.finer = logger.isLoggable(Level.FINER); + this.fine = logger.isLoggable(Level.FINE); } @Override public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return httpAsync(userid, request); + if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { + return localClient.sendMessage(topic, userid, groupid, request, counter); + } else { + return httpAsync(false, userid, request); + } } @Override public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - httpAsync(userid, request); + if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { + localClient.produceMessage(topic, userid, groupid, request, counter); + } else { + httpAsync(true, userid, request); + } } @Override @@ -62,14 +81,17 @@ public class HttpMessageClusterClient extends HttpMessageClient { } private CompletableFuture> mqtpAsync(Serializable userid, HttpSimpleRequest req) { - final boolean finest = logger.isLoggable(Level.FINEST); String module = req.getRequestURI(); module = module.substring(1); //去掉/ module = module.substring(0, module.indexOf('/')); Map headers = req.getHeaders(); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); + final String localModule = module; return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> { - if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture(); + if (addrmap == null || addrmap.isEmpty()) { + if (fine) logger.log(Level.FINE, "mqtpAsync.broadcastMessage: module=" + localModule + ", resname=" + resname + ", addrmap is empty"); + return new HttpResult().status(404).toFuture(); + } final Map clientHeaders = new LinkedHashMap<>(); byte[] clientBody = null; if (req.isRpc()) clientHeaders.put(Rest.REST_HEADER_RPC_NAME, "true"); @@ -96,6 +118,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (paramstr != null) clientBody = paramstr.getBytes(StandardCharsets.UTF_8); } List futures = new ArrayList<>(); + if (finest) logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap); for (Map.Entry> en : addrmap.entrySet()) { String realmodule = en.getKey(); Collection addrs = en.getValue(); @@ -110,15 +133,19 @@ public class HttpMessageClusterClient extends HttpMessageClient { }); } - private CompletableFuture> httpAsync(Serializable userid, HttpSimpleRequest req) { - final boolean finest = logger.isLoggable(Level.FINEST); + private CompletableFuture> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { String module = req.getRequestURI(); module = module.substring(1); //去掉/ module = module.substring(0, module.indexOf('/')); Map headers = req.getHeaders(); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); + final String localModule = module; + if (finest) logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { - if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); + if (addrs == null || addrs.isEmpty()) { + if (fine) logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty"); + return new HttpResult().status(404).toFuture(); + } final Map clientHeaders = new LinkedHashMap<>(); byte[] clientBody = null; if (req.isRpc()) clientHeaders.put(Rest.REST_HEADER_RPC_NAME, "true"); @@ -126,9 +153,14 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (req.getReqConvertType() != null) clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); if (req.getRespConvertType() != null) clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); if (userid != null) clientHeaders.put(Rest.REST_HEADER_CURRUSERID_NAME, "" + userid); - if (headers != null) headers.forEach((n, v) -> { - if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) clientHeaders.put(n, v); + if (headers != null) { + boolean ws = headers.containsKey("Sec-WebSocket-Key"); + headers.forEach((n, v) -> { + if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase()) + && (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n) + && !"Sec-WebSocket-Version".equals(n)))) clientHeaders.put(n, v); }); + } clientHeaders.put("Content-Type", "x-www-form-urlencoded"); if (req.getBody() != null && req.getBody().length > 0) { String paramstr = req.getParametersToString(); @@ -144,6 +176,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { String paramstr = req.getParametersToString(); if (paramstr != null) clientBody = paramstr.getBytes(StandardCharsets.UTF_8); } + if (finest) logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), clientHeaders, clientBody, addrs.iterator()); }); } @@ -152,7 +185,19 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (!it.hasNext()) return CompletableFuture.completedFuture(null); InetSocketAddress addr = it.next(); String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; - return httpClient.postAsync(url, clientHeaders, clientBody); + if (finest) logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); + if (httpSimpleClient != null) return httpSimpleClient.postAsync(url, clientHeaders, clientBody); + java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url)) + .timeout(Duration.ofMillis(10_000)) + //存在sendHeader后不发送body数据的问题, java.net.http.HttpRequest的bug? + .method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody)); + if (clientHeaders != null) clientHeaders.forEach((n, v) -> builder.header(n, v)); + return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) + .thenApply((java.net.http.HttpResponse resp) -> { + final int rs = resp.statusCode(); + if (rs != 200) return new HttpResult().status(rs); + return new HttpResult(resp.body()); + }); } // private CompletableFuture> mqtpAsync(Serializable userid, HttpSimpleRequest req) { diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index 10afd1fea..bb2fab6c4 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -72,53 +72,31 @@ public class HttpMessageLocalClient extends HttpMessageClient { return (HttpPrepareServlet) httpServer().getPrepareServlet(); } + protected HttpServlet findHttpServlet(String topic) { + return prepareServlet().findServletByTopic(topic); + } + + protected HttpServlet findHttpServlet(HttpSimpleRequest request) { + return prepareServlet().findServletByTopic(generateHttpReqTopic(request, request.getPath())); + } + @Override public CompletableFuture sendMessage(HttpSimpleRequest request, Type type) { - HttpPrepareServlet ps = prepareServlet(); - String topic = generateHttpReqTopic(request, request.getPath()); - HttpServlet servlet = ps.findServletByTopic(topic); - CompletableFuture future = new CompletableFuture(); - if (servlet == null) { - future.completeExceptionally(new RuntimeException("404 Not Found " + topic)); - return future; - } - HttpRequest req = new HttpMessageLocalRequest(context(), request); - HttpResponse resp = new HttpMessageLocalResponse(req, future); - try { - servlet.execute(req, resp); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; + return sendMessage((Serializable) null, (String) null, request, type); } @Override public CompletableFuture sendMessage(Serializable userid, HttpSimpleRequest request, Type type) { - HttpPrepareServlet ps = prepareServlet(); - String topic = generateHttpReqTopic(request, request.getPath()); - HttpServlet servlet = ps.findServletByTopic(topic); - CompletableFuture future = new CompletableFuture(); - if (servlet == null) { - future.completeExceptionally(new RuntimeException("404 Not Found " + topic)); - return future; - } - HttpRequest req = new HttpMessageLocalRequest(context(), request); - HttpResponse resp = new HttpMessageLocalResponse(req, future); - try { - servlet.execute(req, resp); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; + return sendMessage(userid, (String) null, request, type); } @Override public CompletableFuture sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) { - HttpPrepareServlet ps = prepareServlet(); String topic = generateHttpReqTopic(request, request.getPath()); - HttpServlet servlet = ps.findServletByTopic(topic); + HttpServlet servlet = findHttpServlet(topic); CompletableFuture future = new CompletableFuture(); if (servlet == null) { + if (fine) logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet"); future.completeExceptionally(new RuntimeException("404 Not Found " + topic)); return future; } @@ -134,9 +112,11 @@ public class HttpMessageLocalClient extends HttpMessageClient { @Override public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - HttpPrepareServlet ps = prepareServlet(); - HttpServlet servlet = ps.findServletByTopic(topic); - if (servlet == null) return CompletableFuture.completedFuture(new HttpResult().status(404)); + HttpServlet servlet = findHttpServlet(topic); + if (servlet == null) { + if (fine) logger.log(Level.FINE, "sendMessage: request=" + request + ", not found servlet"); + return CompletableFuture.completedFuture(new HttpResult().status(404)); + } HttpRequest req = new HttpMessageLocalRequest(context(), request); CompletableFuture future = new CompletableFuture(); HttpResponse resp = new HttpMessageLocalResponse(req, future); @@ -160,7 +140,10 @@ public class HttpMessageLocalClient extends HttpMessageClient { public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { HttpPrepareServlet ps = prepareServlet(); HttpServlet servlet = ps.findServletByTopic(topic); - if (servlet == null) return; + if (servlet == null) { + if (fine) logger.log(Level.FINE, "produceMessage: request=" + request + ", not found servlet"); + return; + } HttpRequest req = new HttpMessageLocalRequest(context(), request); HttpResponse resp = new HttpMessageLocalResponse(req, null); try { @@ -200,17 +183,70 @@ public class HttpMessageLocalClient extends HttpMessageClient { this.future = future; } + @Override + public void finishJson(final Object obj) { + finish((Convert) null, (Type) null, obj); + } + + @Override + public void finishJson(final JsonConvert convert, final Object obj) { + finish(convert, (Type) null, obj); + } + + @Override + public void finishJson(final Type type, final Object obj) { + finish((Convert) null, type, obj); + } + + @Override + public void finishJson(final JsonConvert convert, final Type type, final Object obj) { + if (future == null) return; + future.complete(obj); + } + + @Override + public void finish(Type type, org.redkale.service.RetResult ret) { + finish((Convert) null, type, ret); + } + + @Override + public void finish(final Convert convert, Type type, org.redkale.service.RetResult ret) { + if (future == null) return; + future.complete(ret); + } + + @Override + public void finish(final Convert convert, final Type type, Object obj) { + if (future == null) return; + future.complete(obj); + } + @Override public void finish(String obj) { if (future == null) return; future.complete(obj == null ? "" : obj); } + @Override + public void finish304() { + finish(304, null); + } + @Override public void finish404() { finish(404, null); } + @Override + public void finish500() { + finish(500, null); + } + + @Override + public void finish504() { + finish(504, null); + } + @Override public void finish(int status, String msg) { if (future == null) return; diff --git a/src/main/java/org/redkale/mq/HttpMessageProcessor.java b/src/main/java/org/redkale/mq/HttpMessageProcessor.java index 54154a0cd..27abd4c7f 100644 --- a/src/main/java/org/redkale/mq/HttpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageProcessor.java @@ -86,7 +86,7 @@ public class HttpMessageProcessor implements MessageProcessor { @Override public void begin(final int size, long starttime) { this.starttime = starttime; - this.cdl = new CountDownLatch(size); + this.cdl = size > 1 ? new CountDownLatch(size) : null; } @Override diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index 260797175..422302248 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -12,6 +12,7 @@ import java.util.Arrays; import java.util.function.*; import java.util.logging.Level; import org.redkale.convert.*; +import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; import org.redkale.service.RetResult; import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT; @@ -71,6 +72,10 @@ public class HttpMessageResponse extends HttpResponse { finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), type, this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result); } + public void finishHttpResult(Type type, Convert respConvert, HttpResult result) { + finishHttpResult(this.finest, respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result); + } + public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) { if (callback != null) callback.run(); if (resptopic == null || resptopic.isEmpty()) return; @@ -108,6 +113,67 @@ public class HttpMessageResponse extends HttpResponse { return rs; } + @Override + public void finishJson(final Object obj) { + finishJson((JsonConvert) null, (Type) null, obj); + } + + @Override + public void finishJson(final JsonConvert convert, final Object obj) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(obj.getClass(), convert, new HttpResult(obj)); + + } + + @Override + public void finishJson(final Type type, final Object obj) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(type, new HttpResult(obj)); + } + + @Override + public void finishJson(final JsonConvert convert, final Type type, final Object obj) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(type, convert, new HttpResult(obj)); + } + + @Override + public void finish(Type type, org.redkale.service.RetResult ret) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(type, new HttpResult(ret)); + + } + + @Override + public void finish(final Convert convert, Type type, org.redkale.service.RetResult ret) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(type, convert, new HttpResult(ret)); + } + + @Override + public void finish(final Convert convert, final Type type, Object obj) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + finishHttpResult(type, convert, new HttpResult(obj)); + } + @Override public void finish(String obj) { if (message.isEmptyResptopic()) { @@ -117,11 +183,26 @@ public class HttpMessageResponse extends HttpResponse { finishHttpResult(String.class, new HttpResult(obj == null ? "" : obj)); } + @Override + public void finish304() { + finish(304, null); + } + @Override public void finish404() { finish(404, null); } + @Override + public void finish500() { + finish(500, null); + } + + @Override + public void finish504() { + finish(504, null); + } + @Override public void finish(int status, String msg) { if (status > 400) { @@ -169,6 +250,16 @@ public class HttpMessageResponse extends HttpResponse { finishHttpResult(null, new HttpResult(rs).contentType(contentType)); } + @Override + protected void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length, Consumer consumer, A attachment) { + if (message.isEmptyResptopic()) { + if (callback != null) callback.run(); + return; + } + byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); + finishHttpResult(null, new HttpResult(rs).contentType(contentType)); + } + @Override public void finish(boolean kill, ByteBuffer buffer) { if (message.isEmptyResptopic()) { diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index b18b8a166..e1f7c953a 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -8,7 +8,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; +import java.util.logging.*; import java.util.stream.Collectors; import javax.annotation.Resource; import org.redkale.boot.*; @@ -165,12 +165,15 @@ public abstract class MessageAgent { if (this.sncpProducer == null) { synchronized (sncpProducerLock) { if (this.sncpProducer == null) { + long s = System.currentTimeMillis(); MessageProducer[] producers = new MessageProducer[producerCount]; for (int i = 0; i < producers.length; i++) { MessageProducer producer = createProducer("SncpProducer"); producer.startup().join(); producers[i] = producer; } + long e = System.currentTimeMillis() - s; + if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); this.sncpProducer = new MessageProducers(producers); } } @@ -182,12 +185,15 @@ public abstract class MessageAgent { if (this.httpProducer == null) { synchronized (httpProducerLock) { if (this.httpProducer == null) { + long s = System.currentTimeMillis(); MessageProducer[] producers = new MessageProducer[producerCount]; for (int i = 0; i < producers.length; i++) { MessageProducer producer = createProducer("HttpProducer"); producer.startup().join(); producers[i] = producer; } + long e = System.currentTimeMillis() - s; + if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); this.httpProducer = new MessageProducers(producers); } } diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 3f0d1bfd4..05a31985f 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -76,18 +76,23 @@ public abstract class MessageClient { if (node.scheduledFuture != null) node.scheduledFuture.cancel(true); AtomicLong ncer = node.getCounter(); if (ncer != null) ncer.decrementAndGet(); + final long cha = now - msg.createtime; + if (finest) messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")"); node.future.complete(msg); - long cha = now - msg.createtime; - if (cha > 1000 && fine) { - messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.process (mqs.delays = " + cha + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); - } else if (cha > 50 && finer) { - messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.process (mq.delays = " + cha + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); + long cha2 = System.currentTimeMillis() - now; + if ((cha > 1000 || cha2 > 1000) && fine) { + messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); + } else if ((cha > 50 || cha2 > 50) && finer) { + messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); } else if (finest) { - messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.process (mq.delay = " + cha + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); + messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg)); } }; + long ones = System.currentTimeMillis(); MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor); one.startup().join(); + long onee = System.currentTimeMillis() - ones; + if (finest) messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.startup " + onee + "ms "); this.respConsumer = one; } } diff --git a/src/main/java/org/redkale/mq/SncpMessageProcessor.java b/src/main/java/org/redkale/mq/SncpMessageProcessor.java index 8174f0544..f0b18a5fe 100644 --- a/src/main/java/org/redkale/mq/SncpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/SncpMessageProcessor.java @@ -64,7 +64,7 @@ public class SncpMessageProcessor implements MessageProcessor { @Override public void begin(final int size, long starttime) { this.starttime = starttime; - this.cdl = new CountDownLatch(size); + this.cdl = size > 1 ? new CountDownLatch(size) : null; } @Override diff --git a/src/main/java/org/redkale/net/AsyncThread.java b/src/main/java/org/redkale/net/AsyncThread.java index 58ebc7354..d3876519a 100644 --- a/src/main/java/org/redkale/net/AsyncThread.java +++ b/src/main/java/org/redkale/net/AsyncThread.java @@ -28,4 +28,13 @@ public abstract class AsyncThread extends WorkThread { return t instanceof AsyncThread ? (AsyncThread) t : null; } + /** + * 是否IO线程 + * + * @return boolean + */ + @Override + public final boolean inIO() { + return true; + } } diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index 4c349d6de..073d9df3e 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -133,7 +133,7 @@ public class Context { } } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.finish(true); + response.error(); } }); } else if (workExecutor != null) { @@ -142,7 +142,7 @@ public class Context { servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.finish(true); + response.error(); } }); } else { @@ -150,7 +150,7 @@ public class Context { servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); - response.finish(true); + response.error(); } } diff --git a/src/main/java/org/redkale/net/PrepareServlet.java b/src/main/java/org/redkale/net/PrepareServlet.java index 4b36475c8..84201c0e8 100644 --- a/src/main/java/org/redkale/net/PrepareServlet.java +++ b/src/main/java/org/redkale/net/PrepareServlet.java @@ -221,7 +221,7 @@ public abstract class PrepareServlet> { this.finish(false); } + protected void error() { + finish(true); + } + public void finish(boolean kill) { if (!this.inited) return; //避免重复关闭 //System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 7c8fa68fc..bad51ac95 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -86,6 +86,16 @@ public class WorkThread extends Thread implements Executor { return workExecutor; } + /** + * 是否IO线程 + * + * @since 2.6.0 + * @return boolean + */ + public boolean inIO() { + return false; + } + public boolean inCurrThread() { return this == Thread.currentThread(); } diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 97422fc51..edf35b71b 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -23,7 +23,7 @@ import org.redkale.util.ByteArray; */ public abstract class ClientCodec { - private final List> results = new ArrayList<>(); + protected final List> results = new ArrayList<>(); public ClientCodec() { } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 8010c5ac7..a397de09e 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -153,17 +153,15 @@ public class ClientConnection implements Consumer respFuture.completeExceptionally(rs.exc)); } } else { - if (workThread == null || workThread == Thread.currentThread() - || workThread.getState() == Thread.State.BLOCKED - || workThread.getState() == Thread.State.WAITING) { + if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() + || workThread.getState() != Thread.State.RUNNABLE) { respFuture.complete(rs.result); } else { workThread.execute(() -> respFuture.complete(rs.result)); diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 2857b83f7..a3eb3f8fc 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -61,9 +61,8 @@ public class ClientFuture extends CompletableFuture implements Runnable { workThread = request.workThread; request.workThread = null; } - if (workThread == null || workThread == Thread.currentThread() - || workThread.getState() == Thread.State.BLOCKED - || workThread.getState() == Thread.State.WAITING) { + if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() + || workThread.getState() != Thread.State.RUNNABLE) { this.completeExceptionally(ex); } else { workThread.execute(() -> completeExceptionally(ex)); diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 853b848ba..82619f5bb 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -430,27 +430,27 @@ public class HttpResponse extends Response { * @param type 指定的RetResult泛型类型 * @param ret RetResult输出对象 */ - @Deprecated //@since 2.5.0 - public void finishJson(Type type, org.redkale.service.RetResult ret) { - this.contentType = this.jsonContentType; - if (this.retResultHandler != null) { - ret = this.retResultHandler.apply(this.request, ret); - } - if (this.recycleListener != null) this.output = ret; - if (ret != null && !ret.isSuccess()) { - this.header.addValue("retcode", String.valueOf(ret.getRetcode())); - this.header.addValue("retinfo", ret.getRetinfo()); - } - Convert convert = ret == null ? null : ret.convert(); - if (convert == null) convert = request.getRespConvert(); - if (convert == jsonRootConvert) { - JsonBytesWriter writer = jsonWriter; - convert.convertTo(writer.clear(), type, ret); - finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null); - } else { - convert.convertToBytes(type, ret, convertHandler); - } - } +// @Deprecated //@since 2.5.0 +// public void finishJson(Type type, org.redkale.service.RetResult ret) { +// this.contentType = this.jsonContentType; +// if (this.retResultHandler != null) { +// ret = this.retResultHandler.apply(this.request, ret); +// } +// if (this.recycleListener != null) this.output = ret; +// if (ret != null && !ret.isSuccess()) { +// this.header.addValue("retcode", String.valueOf(ret.getRetcode())); +// this.header.addValue("retinfo", ret.getRetinfo()); +// } +// Convert convert = ret == null ? null : ret.convert(); +// if (convert == null) convert = request.getRespConvert(); +// if (convert == jsonRootConvert) { +// JsonBytesWriter writer = jsonWriter; +// convert.convertTo(writer.clear(), type, ret); +// finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null); +// } else { +// convert.convertToBytes(type, ret, convertHandler); +// } +// } /** * 将RetResult对象以JSON格式输出 @@ -459,25 +459,25 @@ public class HttpResponse extends Response { * @param type 指定的RetResult泛型类型 * @param ret RetResult输出对象 */ - @Deprecated //@since 2.5.0 - public void finishJson(final JsonConvert convert, Type type, org.redkale.service.RetResult ret) { - this.contentType = this.jsonContentType; - if (this.retResultHandler != null) { - ret = this.retResultHandler.apply(this.request, ret); - } - if (this.recycleListener != null) this.output = ret; - if (ret != null && !ret.isSuccess()) { - this.header.addValue("retcode", String.valueOf(ret.getRetcode())); - this.header.addValue("retinfo", ret.getRetinfo()); - } - if (convert == jsonRootConvert) { - JsonBytesWriter writer = jsonWriter; - convert.convertTo(writer.clear(), type, ret); - finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null); - } else { - convert.convertToBytes(type, ret, convertHandler); - } - } +// @Deprecated //@since 2.5.0 +// public void finishJson(final JsonConvert convert, Type type, org.redkale.service.RetResult ret) { +// this.contentType = this.jsonContentType; +// if (this.retResultHandler != null) { +// ret = this.retResultHandler.apply(this.request, ret); +// } +// if (this.recycleListener != null) this.output = ret; +// if (ret != null && !ret.isSuccess()) { +// this.header.addValue("retcode", String.valueOf(ret.getRetcode())); +// this.header.addValue("retinfo", ret.getRetinfo()); +// } +// if (convert == jsonRootConvert) { +// JsonBytesWriter writer = jsonWriter; +// convert.convertTo(writer.clear(), type, ret); +// finish(false, (String) null, writer.content(), writer.offset(), writer.length(), null, null); +// } else { +// convert.convertToBytes(type, ret, convertHandler); +// } +// } /** * 将CompletableFuture的结果对象以JSON格式输出 @@ -486,11 +486,11 @@ public class HttpResponse extends Response { * @param valueType 指定CompletableFuture.value的泛型类型 * @param future 输出对象的句柄 */ - @Deprecated //@since 2.5.0 - @SuppressWarnings("unchecked") - public void finishJson(final JsonConvert convert, final Type valueType, final CompletableFuture future) { - finish(convert, valueType, future); - } +// @Deprecated //@since 2.5.0 +// @SuppressWarnings("unchecked") +// public void finishJson(final JsonConvert convert, final Type valueType, final CompletableFuture future) { +// finish(convert, valueType, future); +// } /** * 将RetResult对象输出 @@ -932,7 +932,12 @@ public class HttpResponse extends Response { // } // } } - + + @Override + protected void error() { + finish500(); + } + /** * 以304状态码输出 */ diff --git a/src/main/java/org/redkale/net/http/HttpClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java similarity index 93% rename from src/main/java/org/redkale/net/http/HttpClient.java rename to src/main/java/org/redkale/net/http/HttpSimpleClient.java index a0a2e057c..3a91531ba 100644 --- a/src/main/java/org/redkale/net/http/HttpClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -20,7 +20,7 @@ import org.redkale.util.*; * 1、使用HTTPS;
* 2、上传下载文件;
* 3、返回超大响应包;
- * 类似JDK11的 java.net.http.HttpClient
+ * 类似JDK11的 java.net.http.HttpSimpleClient
* *

* 详情见: https://redkale.org @@ -29,7 +29,7 @@ import org.redkale.util.*; * @since 2.3.0 * */ -public class HttpClient { +public class HttpSimpleClient { protected final AsyncGroup asyncGroup; @@ -37,12 +37,12 @@ public class HttpClient { protected int writeTimeoutSeconds = 6; - protected HttpClient(AsyncGroup asyncGroup) { + protected HttpSimpleClient(AsyncGroup asyncGroup) { this.asyncGroup = asyncGroup; } - public static HttpClient create(AsyncGroup asyncGroup) { - return new HttpClient(asyncGroup); + public static HttpSimpleClient create(AsyncGroup asyncGroup) { + return new HttpSimpleClient(asyncGroup); } public CompletableFuture> getAsync(String url) { @@ -103,7 +103,7 @@ public class HttpClient { + "Host: " + uri.getHost() + "\r\n" + "Content-Length: " + (body == null ? 0 : body.length) + "\r\n").getBytes(StandardCharsets.UTF_8)); if (headers == null || !headers.containsKey("User-Agent")) { - array.put(("User-Agent: redkale-httpclient/" + Redkale.getDotedVersion() + "\r\n").getBytes(StandardCharsets.UTF_8)); + array.put(("User-Agent: Redkale-http-client/" + Redkale.getDotedVersion() + "\r\n").getBytes(StandardCharsets.UTF_8)); } if (headers == null || !headers.containsKey("Connection")) { array.put(("Connection: close\r\n").getBytes(StandardCharsets.UTF_8)); @@ -115,7 +115,6 @@ public class HttpClient { } array.put((byte) '\r', (byte) '\n'); if (body != null) array.put(body); - System.out.println(array); final CompletableFuture> future = new CompletableFuture(); conn.write(array, new CompletionHandler() { @Override @@ -137,7 +136,7 @@ public class HttpClient { // final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); // asyncGroup.start(); // String url = "http://redkale.org"; -// HttpClient client = HttpClient.create(asyncGroup); +// HttpSimpleClient client = HttpSimpleClient.create(asyncGroup); // System.out.println(client.getAsync(url).join()); // } @@ -211,7 +210,9 @@ public class HttpClient { if (buffer.hasRemaining()) array.put(buffer, buffer.remaining()); this.readState = READ_STATE_END; } - this.responseResult.setResult(array.getBytes()); + if (responseResult.getStatus() <= 200) { + this.responseResult.setResult(array.getBytes()); + } this.future.complete(this.responseResult); conn.offerBuffer(buffer); conn.dispose(); diff --git a/src/main/java/org/redkale/net/http/Rest.java b/src/main/java/org/redkale/net/http/Rest.java index 146f8de72..9c5dc9ec9 100644 --- a/src/main/java/org/redkale/net/http/Rest.java +++ b/src/main/java/org/redkale/net/http/Rest.java @@ -321,11 +321,13 @@ public final class Rest { } final Map> asmParamMap = namePresent ? null : MethodParamClassVisitor.getMethodParamNames(new HashMap<>(), webSocketType); final Set messageNames = new HashSet<>(); - final List messageMethods = new ArrayList<>(); + Method wildcardMethod = null; + List mmethods = new ArrayList<>(); for (Method method : webSocketType.getMethods()) { RestOnMessage rom = method.getAnnotation(RestOnMessage.class); if (rom == null) continue; String name = rom.name(); + if (!"*".equals(name) && !checkName(name)) throw new RuntimeException("@RestOnMessage.name contains illegal characters on (" + method + ")"); if (Modifier.isFinal(method.getModifiers())) throw new RuntimeException("@RestOnMessage method can not final but (" + method + ")"); if (Modifier.isStatic(method.getModifiers())) throw new RuntimeException("@RestOnMessage method can not static but (" + method + ")"); if (method.getReturnType() != void.class) throw new RuntimeException("@RestOnMessage method must return void but (" + method + ")"); @@ -333,8 +335,16 @@ public final class Rest { if (name.isEmpty()) throw new RuntimeException(method + " RestOnMessage.name is empty createRestWebSocketServlet"); if (messageNames.contains(name)) throw new RuntimeException(method + " repeat RestOnMessage.name(" + name + ") createRestWebSocketServlet"); messageNames.add(name); - messageMethods.add(method); + if ("*".equals(name)) { + wildcardMethod = method; + } else { + mmethods.add(method); + } } + final List messageMethods = new ArrayList<>(); + messageMethods.addAll(mmethods); + //wildcardMethod 必须放最后, _DynRestOnMessageConsumer 是按messageMethods顺序来判断的 + if (wildcardMethod != null) messageMethods.add(wildcardMethod); //---------------------------------------------------------------------------------------- final String resDesc = Type.getDescriptor(Resource.class); final String wsDesc = Type.getDescriptor(WebSocket.class); @@ -364,7 +374,8 @@ public final class Rest { for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); - msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations()); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); + msgclassToAnnotations.put(newDynSuperMessageFullName, method.getAnnotations()); } clz.getField("_redkale_annotations").set(null, msgclassToAnnotations); if (rws.cryptor() != Cryptor.class) { @@ -436,7 +447,8 @@ public final class Rest { for (int i = 0; i < messageMethods.size(); i++) { Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); - cw.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); + cw.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); } } { //@Resource @@ -463,7 +475,7 @@ public final class Rest { mv.visitMethodInsn(INVOKESPECIAL, supDynName, "", "()V", false); mv.visitVarInsn(ALOAD, 0); mv.visitLdcInsn(Type.getObjectType(newDynName + "$" + newDynWebSokcetSimpleName + "Message")); - mv.visitFieldInsn(PUTFIELD, newDynName, "messageTextType", "Ljava/lang/reflect/Type;"); + mv.visitFieldInsn(PUTFIELD, newDynName, "messageRestType", "Ljava/lang/reflect/Type;"); mv.visitVarInsn(ALOAD, 0); MethodDebugVisitor.pushInt(mv, rws.liveinterval()); @@ -526,13 +538,14 @@ public final class Rest { RestClassLoader newLoader = new RestClassLoader(loader); Map msgclassToAnnotations = new HashMap<>(); for (int i = 0; i < messageMethods.size(); i++) { // _DyncXXXWebSocketMessage 子消息List - Method method = messageMethods.get(i); + final Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); - msgclassToAnnotations.put(newDynMessageFullName + endfix, method.getAnnotations()); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); + msgclassToAnnotations.put(newDynSuperMessageFullName, method.getAnnotations()); ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES); - cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynMessageFullName + endfix, null, "java/lang/Object", new String[]{webSocketParamName, "java/lang/Runnable"}); - cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); + cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynSuperMessageFullName, null, "java/lang/Object", new String[]{webSocketParamName, "java/lang/Runnable"}); + cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); Set paramnames = new HashSet<>(); String methodesc = method.getName() + ":" + Type.getMethodDescriptor(method); List names = asmParamMap == null ? null : asmParamMap.get(methodesc); @@ -555,6 +568,17 @@ public final class Rest { param.getType() == param.getParameterizedType() ? null : Utility.getTypeDescriptor(param.getParameterizedType()), null); fv.visitEnd(); } + if (method == wildcardMethod) { + for (int j = 0; j < messageMethods.size(); j++) { + Method method2 = messageMethods.get(j); + if (method2 == wildcardMethod) continue; + String endfix2 = "_" + method2.getName() + "_" + (j > 9 ? j : ("0" + j)); + String newDynSuperMessageFullName2 = newDynMessageFullName + (method2 == wildcardMethod ? "" : endfix2); + cw2.visitInnerClass(newDynSuperMessageFullName2, newDynName, newDynMessageSimpleName + endfix2, ACC_PUBLIC + ACC_STATIC); + fv = cw2.visitField(ACC_PUBLIC, method2.getAnnotation(RestOnMessage.class).name(), "L" + newDynSuperMessageFullName2 + ";", null, null); + fv.visitEnd(); + } + } { //_redkale_websocket fv = cw2.visitField(ACC_PUBLIC, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";", null, null); av0 = fv.visitAnnotation(convertDisabledDesc, true); @@ -571,6 +595,8 @@ public final class Rest { } { //getNames mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getNames", "()[Ljava/lang/String;", null, null)); + av0 = mv.visitAnnotation(convertDisabledDesc, true); + av0.visitEnd(); MethodDebugVisitor.pushInt(mv, paramap.size()); mv.visitTypeInsn(ANEWARRAY, "java/lang/String"); int index = -1; @@ -594,7 +620,7 @@ public final class Rest { Label l1 = new Label(); mv.visitJumpInsn(IFEQ, l1); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynMessageFullName + endfix, en.getKey(), Type.getDescriptor(paramType)); + mv.visitFieldInsn(GETFIELD, newDynSuperMessageFullName, en.getKey(), Type.getDescriptor(paramType)); if (paramType.isPrimitive()) { Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(paramType, 1), 0).getClass(); mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(paramType) + ")" + Type.getDescriptor(bigclaz), false); @@ -609,8 +635,10 @@ public final class Rest { } { //getAnnotations mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "getAnnotations", "()[Ljava/lang/annotation/Annotation;", null, null)); + av0 = mv.visitAnnotation(convertDisabledDesc, true); + av0.visitEnd(); mv.visitFieldInsn(GETSTATIC, newDynName, "_redkale_annotations", "Ljava/util/Map;"); - mv.visitLdcInsn(newDynMessageFullName + endfix); + mv.visitLdcInsn(newDynSuperMessageFullName); mv.visitMethodInsn(INVOKEINTERFACE, "java/util/Map", "get", "(Ljava/lang/Object;)Ljava/lang/Object;", true); mv.visitTypeInsn(CHECKCAST, "[Ljava/lang/annotation/Annotation;"); mv.visitVarInsn(ASTORE, 1); @@ -635,7 +663,7 @@ public final class Rest { mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(L" + newDynWebSokcetFullName + ";)V", null, null)); mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 1); - mv.visitFieldInsn(PUTFIELD, newDynMessageFullName + endfix, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";"); + mv.visitFieldInsn(PUTFIELD, newDynSuperMessageFullName, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";"); mv.visitVarInsn(ALOAD, 1); mv.visitLdcInsn(method.getAnnotation(RestOnMessage.class).name()); mv.visitVarInsn(ALOAD, 0); @@ -648,11 +676,11 @@ public final class Rest { { //run mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "run", "()V", null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynMessageFullName + endfix, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";"); + mv.visitFieldInsn(GETFIELD, newDynSuperMessageFullName, "_redkale_websocket", "L" + newDynWebSokcetFullName + ";"); for (Map.Entry en : paramap.entrySet()) { mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, (newDynMessageFullName + endfix), en.getKey(), Type.getDescriptor(en.getValue().getType())); + mv.visitFieldInsn(GETFIELD, (newDynSuperMessageFullName), en.getKey(), Type.getDescriptor(en.getValue().getType())); } mv.visitMethodInsn(INVOKEVIRTUAL, newDynWebSokcetFullName, method.getName(), Type.getMethodDescriptor(method), false); @@ -671,11 +699,11 @@ public final class Rest { } cw2.visitEnd(); byte[] bytes = cw2.toByteArray(); - Class cz = newLoader.loadClass((newDynMessageFullName + endfix).replace('/', '.'), bytes); - RedkaleClassLoader.putDynClass((newDynMessageFullName + endfix).replace('/', '.'), bytes, cz); + Class cz = newLoader.loadClass((newDynSuperMessageFullName).replace('/', '.'), bytes); + RedkaleClassLoader.putDynClass((newDynSuperMessageFullName).replace('/', '.'), bytes, cz); } - { //_DynXXXWebSocketMessage class + if (wildcardMethod == null) { //_DynXXXWebSocketMessage class ClassWriter cw2 = new ClassWriter(COMPUTE_FRAMES); cw2.visit(V11, ACC_PUBLIC + ACC_FINAL + ACC_SUPER, newDynMessageFullName, null, "java/lang/Object", null); @@ -684,9 +712,10 @@ public final class Rest { for (int i = 0; i < messageMethods.size(); i++) { Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); - cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); + cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); - fv = cw2.visitField(ACC_PUBLIC, method.getAnnotation(RestOnMessage.class).name(), "L" + newDynMessageFullName + endfix + ";", null, null); + fv = cw2.visitField(ACC_PUBLIC, method.getAnnotation(RestOnMessage.class).name(), "L" + newDynSuperMessageFullName + ";", null, null); fv.visitEnd(); } { //构造函数 @@ -751,7 +780,8 @@ public final class Rest { for (int i = 0; i < messageMethods.size(); i++) { Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); - cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); + cw2.visitInnerClass(newDynSuperMessageFullName, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); } { //构造函数 @@ -776,19 +806,25 @@ public final class Rest { for (int i = 0; i < messageMethods.size(); i++) { final Method method = messageMethods.get(i); String endfix = "_" + method.getName() + "_" + (i > 9 ? i : ("0" + i)); + String newDynSuperMessageFullName = newDynMessageFullName + (method == wildcardMethod ? "" : endfix); final String messagename = method.getAnnotation(RestOnMessage.class).name(); + if (method == wildcardMethod) { + mv.visitVarInsn(ALOAD, 4); + mv.visitVarInsn(ALOAD, 3); + mv.visitMethodInsn(INVOKEVIRTUAL, newDynSuperMessageFullName, "execute", "(L" + newDynWebSokcetFullName + ";)V", false); + } else { + mv.visitVarInsn(ALOAD, 4); + mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + newDynSuperMessageFullName + ";"); + Label ifLabel = new Label(); + mv.visitJumpInsn(IFNULL, ifLabel); - mv.visitVarInsn(ALOAD, 4); - mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + (newDynMessageFullName + endfix) + ";"); - Label ifLabel = new Label(); - mv.visitJumpInsn(IFNULL, ifLabel); - - mv.visitVarInsn(ALOAD, 4); - mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + (newDynMessageFullName + endfix) + ";"); - mv.visitVarInsn(ALOAD, 3); - mv.visitMethodInsn(INVOKEVIRTUAL, (newDynMessageFullName + endfix), "execute", "(L" + newDynWebSokcetFullName + ";)V", false); - mv.visitInsn(RETURN); - mv.visitLabel(ifLabel); + mv.visitVarInsn(ALOAD, 4); + mv.visitFieldInsn(GETFIELD, newDynMessageFullName, messagename, "L" + newDynSuperMessageFullName + ";"); + mv.visitVarInsn(ALOAD, 3); + mv.visitMethodInsn(INVOKEVIRTUAL, newDynSuperMessageFullName, "execute", "(L" + newDynWebSokcetFullName + ";)V", false); + mv.visitInsn(RETURN); + mv.visitLabel(ifLabel); + } } mv.visitInsn(RETURN); mv.visitMaxs(3, 3 + messageMethods.size()); @@ -2380,7 +2416,7 @@ public final class Rest { MethodDebugVisitor.pushInt(mv, entry.methodidx);//方法下标 mv.visitInsn(AALOAD); mv.visitMethodInsn(INVOKESTATIC, retInternalName, "success", "()" + retDesc, false); - mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + typeDesc + retDesc + ")V", false); + mv.visitMethodInsn(INVOKEVIRTUAL, respInternalName, "finishJson", "(" + typeDesc + "Ljava/lang/Object;)V", false); mv.visitInsn(RETURN); } else if (returnType == boolean.class) { mv.visitVarInsn(ISTORE, maxLocals); @@ -2905,7 +2941,7 @@ public final class Rest { return t; } - private static boolean checkName(String name) { //不能含特殊字符 + private static boolean checkName(String name) { //只能是字母、数字和下划线,且不能以数字开头 if (name.isEmpty()) return true; if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false; for (char ch : name.toCharArray()) { @@ -2916,7 +2952,7 @@ public final class Rest { return true; } - private static boolean checkName2(String name) { //不能含特殊字符 + private static boolean checkName2(String name) { //只能是字母、数字、短横、点和下划线,且不能以数字开头 if (name.isEmpty()) return true; if (name.charAt(0) >= '0' && name.charAt(0) <= '9') return false; for (char ch : name.toCharArray()) { diff --git a/src/main/java/org/redkale/net/http/RestOnMessage.java b/src/main/java/org/redkale/net/http/RestOnMessage.java index b93cd1f01..4f2217af2 100644 --- a/src/main/java/org/redkale/net/http/RestOnMessage.java +++ b/src/main/java/org/redkale/net/http/RestOnMessage.java @@ -29,7 +29,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; public @interface RestOnMessage { /** - * 请求的方法名, 不能含特殊字符,不能以数字开头(能作为变量名) + * 请求的方法名, 不能含特殊字符,不能以数字开头(能作为变量名)
+ * 值"*" 表示参数中不包含方法名 * * @return String */ diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index f459aacbc..d927bd595 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -106,7 +106,7 @@ public abstract class WebSocket { Convert _sendConvert; //不可能为空 - java.lang.reflect.Type _messageTextType; //不可能为空 + java.lang.reflect.Type _messageRestType; //不可能为空 Deflater deflater; //压缩 diff --git a/src/main/java/org/redkale/net/http/WebSocketNodeService.java b/src/main/java/org/redkale/net/http/WebSocketNodeService.java new file mode 100644 index 000000000..978395485 --- /dev/null +++ b/src/main/java/org/redkale/net/http/WebSocketNodeService.java @@ -0,0 +1,163 @@ +package org.redkale.net.http; + + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.Level; +import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; +import static org.redkale.net.http.WebSocketNode.WS_SOURCE_KEY_USERID_PREFIX; +import org.redkale.service.*; +import org.redkale.util.*; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +@AutoLoad(false) +@ResourceType(WebSocketNode.class) +public class WebSocketNodeService extends WebSocketNode implements Service { + + @Override + public void init(AnyValue conf) { + super.init(conf); + } + + @Override + public void destroy(AnyValue conf) { + super.destroy(conf); + } + + public final void setName(String name) { + this.name = name; + } + + @Override + public CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { + if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid); + if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); + final List rs = new ArrayList<>(); + this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); + return CompletableFuture.completedFuture(rs); + } + + @Override + public CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.sendLocalMessage(message, last, userids); + } + + @Override + public CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.broadcastLocalMessage(wsrange, message, last); + } + + @Override + public CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.sendLocalAction(action, userids); + } + + @Override + public CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.broadcastLocalAction(action); + } + + @Override + public CompletableFuture getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { + if (this.localEngine == null) return CompletableFuture.completedFuture(0); + return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); + } + + /** + * 当用户连接到节点,需要更新到CacheSource + * + * @param userid Serializable + * @param wsaddr WebSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr) { + tryAcquireSemaphore(); + CompletableFuture future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr); + return future; + } + + /** + * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 + * + * @param userid Serializable + * @param wsaddr WebSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture disconnect(Serializable userid, WebSocketAddress wsaddr) { + tryAcquireSemaphore(); + CompletableFuture future = source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr); + return future.thenApply(v -> null); + } + + /** + * 更改用户ID,需要更新到CacheSource + * + * @param olduserid Serializable + * @param newuserid Serializable + * @param wsaddr WebSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) { + tryAcquireSemaphore(); + CompletableFuture future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr); + future = future.thenAccept((a) -> source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr)); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr); + return future; + } + + /** + * 判断用户是否有WebSocket + * + * @param userid Serializable + * @param topic RpcTargetTopic + * @param targetAddress InetSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress); + if (localEngine == null) return CompletableFuture.completedFuture(false); + return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); + } + + /** + * 强制关闭用户的WebSocket + * + * @param userid Serializable + * @param topic RpcTargetTopic + * @param targetAddress InetSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { + //不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress); + if (localEngine == null) return CompletableFuture.completedFuture(0); + return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid)); + } +} diff --git a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java index f400d3b2d..48a1e5695 100644 --- a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java @@ -297,7 +297,7 @@ public class WebSocketReadHandler implements CompletionHandler restMessageConsumer = createRestOnMessageConsumer(); - protected Type messageTextType; //RestWebSocket时会被修改 + protected Type messageRestType; //RestWebSocket时会被修改 //同RestWebSocket.single protected boolean single = true; //是否单用户单连接 @@ -138,7 +137,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } catch (Exception e) { logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method"); } - this.messageTextType = msgtype; + this.messageRestType = msgtype; } @Override @@ -214,7 +213,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl final WebSocket webSocket = this.createWebSocket(); webSocket._engine = this.node.localEngine; webSocket._channel = response.getChannel(); - webSocket._messageTextType = this.messageTextType; + webSocket._messageRestType = this.messageRestType; webSocket._textConvert = textConvert; webSocket._binaryConvert = binaryConvert; webSocket._sendConvert = sendConvert; @@ -232,7 +231,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.finish(true); return; } - sessionFuture.whenComplete((sessionid, ex) -> { + BiConsumer sessionConsumer = (sessionid, ex) -> { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); response.finish(true); @@ -360,6 +359,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.finish(true); } }); + }; + WorkThread workThread = WorkThread.currWorkThread(); + sessionFuture.whenComplete((sessionid, ex) -> { + if (workThread == null || workThread == Thread.currentThread()) { + sessionConsumer.accept(sessionid, ex); + } else { + workThread.execute(() -> sessionConsumer.accept(sessionid, ex)); + } }); } diff --git a/src/main/java/org/redkale/service/AbstractService.java b/src/main/java/org/redkale/service/AbstractService.java index bd0f18771..376107d0f 100644 --- a/src/main/java/org/redkale/service/AbstractService.java +++ b/src/main/java/org/redkale/service/AbstractService.java @@ -21,10 +21,21 @@ public abstract class AbstractService implements Service { @Resource(name = Application.RESNAME_APP_EXECUTOR) private ExecutorService workExecutor; + /** + * 当前Service类的原始Service类型, 由于Service会动态重载,所以getClass()得到的不是原始Service类型 + * + * @return Class + */ protected Class serviceType() { return Sncp.getServiceType(this); } + /** + * 异步执行任务 + * + * + * @param command 任务 + */ protected void runAsync(Runnable command) { if (workExecutor != null) { workExecutor.execute(command); @@ -38,6 +49,12 @@ public abstract class AbstractService implements Service { } } + /** + * 异步执行任务 + * + * @param hash hash值 + * @param command 任务 + */ protected void runAsync(int hash, Runnable command) { if (workExecutor != null) { if (workExecutor instanceof ThreadHashExecutor) { @@ -60,6 +77,11 @@ public abstract class AbstractService implements Service { } } + /** + * 获取线程池 + * + * @return ExecutorService + */ protected ExecutorService getExecutor() { if (workExecutor != null) return workExecutor; Thread thread = Thread.currentThread(); diff --git a/src/main/java/org/redkale/service/WebSocketNodeService.java b/src/main/java/org/redkale/service/WebSocketNodeService.java index 21bbad7ae..cbde439e0 100644 --- a/src/main/java/org/redkale/service/WebSocketNodeService.java +++ b/src/main/java/org/redkale/service/WebSocketNodeService.java @@ -5,162 +5,21 @@ */ package org.redkale.service; -import static org.redkale.net.http.WebSocket.*; -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.logging.Level; import org.redkale.net.http.*; import org.redkale.util.*; /** + * 由 org.redkale.net.http.WebSocketNodeService 代替 * *

* 详情见: https://redkale.org * + * @deprecated 2.6.0 * @author zhangjx */ +@Deprecated @AutoLoad(false) @ResourceType(WebSocketNode.class) -public class WebSocketNodeService extends WebSocketNode implements Service { +public class WebSocketNodeService extends org.redkale.net.http.WebSocketNodeService { - @Override - public void init(AnyValue conf) { - super.init(conf); - } - - @Override - public void destroy(AnyValue conf) { - super.destroy(conf); - } - - public final void setName(String name) { - this.name = name; - } - - @Override - public CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { - if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid); - if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); - final List rs = new ArrayList<>(); - this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); - return CompletableFuture.completedFuture(rs); - } - - @Override - public CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { - if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.sendLocalMessage(message, last, userids); - } - - @Override - public CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { - if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastLocalMessage(wsrange, message, last); - } - - @Override - public CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { - if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.sendLocalAction(action, userids); - } - - @Override - public CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { - if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastLocalAction(action); - } - - @Override - public CompletableFuture getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { - if (this.localEngine == null) return CompletableFuture.completedFuture(0); - return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); - } - - /** - * 当用户连接到节点,需要更新到CacheSource - * - * @param userid Serializable - * @param wsaddr WebSocketAddress - * - * @return 无返回值 - */ - @Override - public CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr) { - tryAcquireSemaphore(); - CompletableFuture future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); - if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr); - return future; - } - - /** - * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 - * - * @param userid Serializable - * @param wsaddr WebSocketAddress - * - * @return 无返回值 - */ - @Override - public CompletableFuture disconnect(Serializable userid, WebSocketAddress wsaddr) { - tryAcquireSemaphore(); - CompletableFuture future = source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); - if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr); - return future.thenApply(v -> null); - } - - /** - * 更改用户ID,需要更新到CacheSource - * - * @param olduserid Serializable - * @param newuserid Serializable - * @param wsaddr WebSocketAddress - * - * @return 无返回值 - */ - @Override - public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) { - tryAcquireSemaphore(); - CompletableFuture future = source.appendSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr); - future = future.thenAccept((a) -> source.removeSetItemAsync(WS_SOURCE_KEY_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr)); - if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr); - return future; - } - - /** - * 判断用户是否有WebSocket - * - * @param userid Serializable - * @param topic RpcTargetTopic - * @param targetAddress InetSocketAddress - * - * @return 无返回值 - */ - @Override - public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress); - if (localEngine == null) return CompletableFuture.completedFuture(false); - return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); - } - - /** - * 强制关闭用户的WebSocket - * - * @param userid Serializable - * @param topic RpcTargetTopic - * @param targetAddress InetSocketAddress - * - * @return 无返回值 - */ - @Override - public CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { - //不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress); - if (localEngine == null) return CompletableFuture.completedFuture(0); - return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid)); - } } diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index f5c60b6a4..763c2798b 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -18,7 +18,7 @@ import org.redkale.util.*; * DataSource的S抽象实现类
* 注意: 所有的操作只能作用在一张表上,不能同时变更多张表 * - *

+ * * 详情见: https://redkale.org * * @author zhangjx @@ -30,39 +30,121 @@ import org.redkale.util.*; @ResourceType(DataSource.class) public abstract class AbstractDataSource extends AbstractService implements DataSource, AutoCloseable, Resourcable { + /** + * 是否虚拟化的持久对象 + * + * @param info EntityInfo + * + * @return boolean + */ protected boolean isOnlyCache(EntityInfo info) { return info.isVirtualEntity(); } + /** + * 是否可以使用缓存,一般包含关联查询就不使用缓存 + * + * @param node 过滤条件 + * @param entityApplyer 函数 + * + * @return boolean + */ protected boolean isCacheUseable(FilterNode node, Function entityApplyer) { return node.isCacheUseable(entityApplyer); } + /** + * 生成过滤函数 + * + * @param 泛型 + * @param 泛型 + * @param node 过滤条件 + * @param cache 缓存 + * + * @return Predicate + */ protected Predicate createPredicate(FilterNode node, EntityCache cache) { return node.createPredicate(cache); } + /** + * 根据ResultSet获取对象 + * + * @param 泛型 + * @param info EntityInfo + * @param sels 过滤字段 + * @param row ResultSet + * + * @return 对象 + */ protected T getEntityValue(EntityInfo info, final SelectColumn sels, final EntityInfo.DataResultSetRow row) { return sels == null ? info.getFullEntityValue(row) : info.getEntityValue(sels, row); } + /** + * 根据ResultSet获取对象 + * + * @param 泛型 + * @param info EntityInfo + * @param constructorAttrs 构造函数字段 + * @param unconstructorAttrs 非构造函数字段 + * @param row ResultSet + * + * @return 对象 + */ protected T getEntityValue(EntityInfo info, final Attribute[] constructorAttrs, final Attribute[] unconstructorAttrs, final EntityInfo.DataResultSetRow row) { return info.getEntityValue(constructorAttrs, unconstructorAttrs, row); } + /** + * 根据翻页参数构建排序SQL + * + * @param 泛型 + * @param info EntityInfo + * @param flipper 翻页参数 + * + * @return SQL + */ protected String createSQLOrderby(EntityInfo info, Flipper flipper) { return info.createSQLOrderby(flipper); } + /** + * 根据过滤条件生成关联表与别名的映射关系 + * + * @param node 过滤条件 + * + * @return Map + */ protected Map getJoinTabalis(FilterNode node) { return node == null ? null : node.getJoinTabalis(); } + /** + * 加载指定类的EntityInfo + * + * @param 泛型 + * @param clazz 类 + * @param cacheForbidden 是否屏蔽缓存 + * @param props 配置信息 + * @param fullloader 加载器 + * + * @return EntityInfo + */ protected EntityInfo loadEntityInfo(Class clazz, final boolean cacheForbidden, final Properties props, BiFunction> fullloader) { return EntityInfo.load(clazz, cacheForbidden, props, this, fullloader); } - //检查对象是否都是同一个Entity类 + /** + * 检查对象是否都是同一个Entity类 + * + * @param 泛型 + * @param action 操作 + * @param async 是否异步 + * @param entitys 对象集合 + * + * @return CompletableFuture + */ protected CompletableFuture checkEntity(String action, boolean async, T... entitys) { if (entitys.length < 1) return null; Class clazz = null; @@ -487,7 +569,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data /** * 根据指定参数查询对象某个字段的集合 - *

+ * * @param Entity类的泛型 * @param 字段值的类型 * @param selectedColumn 字段名 diff --git a/src/main/java/org/redkale/source/DataResultSet.java b/src/main/java/org/redkale/source/DataResultSet.java index b976445c2..b0f883386 100644 --- a/src/main/java/org/redkale/source/DataResultSet.java +++ b/src/main/java/org/redkale/source/DataResultSet.java @@ -7,6 +7,7 @@ package org.redkale.source; import java.io.Serializable; import java.math.*; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.*; import org.redkale.convert.json.JsonConvert; import org.redkale.util.Attribute; @@ -141,6 +142,14 @@ public interface DataResultSet extends EntityInfo.DataResultSetRow { o = new BigInteger(o.toString()); } } + } else if (t == String.class) { + if (o == null) { + o = ""; + } else if (o instanceof byte[]) { + o = new String((byte[]) o, StandardCharsets.UTF_8); + } else { + o = o.toString(); + } } else if (o != null && !t.isAssignableFrom(o.getClass()) && o instanceof CharSequence) { o = ((CharSequence) o).length() == 0 ? null : JsonConvert.root().convertFrom(attr.genericType(), o.toString()); } diff --git a/src/main/java/org/redkale/source/FilterNode.java b/src/main/java/org/redkale/source/FilterNode.java index ac7f52b37..1ef9059ab 100644 --- a/src/main/java/org/redkale/source/FilterNode.java +++ b/src/main/java/org/redkale/source/FilterNode.java @@ -300,17 +300,32 @@ public class FilterNode { //FilterNode 不能实现Serializable接口, 否则 } public static FilterNode create(String column, Serializable value) { - return create(column, null, value); + return filter(column, null, value); } public static FilterNode create(String column, FilterExpress express, Serializable value) { - return create(column, express, true, value); + return filter(column, express, true, value); } public static FilterNode create(String column, FilterExpress express, boolean itemand, Serializable value) { return new FilterNode(column, express, itemand, value); } + //@since 2.6.0 create不利于import static + public static FilterNode filter(String column, Serializable value) { + return filter(column, null, value); + } + + //@since 2.6.0 create不利于import static + public static FilterNode filter(String column, FilterExpress express, Serializable value) { + return filter(column, express, true, value); + } + + //@since 2.6.0 create不利于import static + public static FilterNode filter(String column, FilterExpress express, boolean itemand, Serializable value) { + return new FilterNode(column, express, itemand, value); + } + private boolean needSplit(final Object val0) { return needSplit(express, val0); } diff --git a/src/main/java/org/redkale/util/AnyValue.java b/src/main/java/org/redkale/util/AnyValue.java index 343c811c9..3394f7d06 100644 --- a/src/main/java/org/redkale/util/AnyValue.java +++ b/src/main/java/org/redkale/util/AnyValue.java @@ -135,7 +135,13 @@ public abstract class AnyValue { return rs; } - //去重 + /** + * 合并两个AnyValue对象, 会去重, 没有的才增加 + * + * @param av AnyValue + * + * @return DefaultAnyValue + */ public DefaultAnyValue addAllStringSet(final AnyValue av) { if (av == null) return this; final Entry[] strings = av.getStringEntrys(); @@ -146,6 +152,13 @@ public abstract class AnyValue { return this; } + /** + * 合并两个AnyValue对象 不去重 + * + * @param av AnyValue + * + * @return DefaultAnyValue + */ public DefaultAnyValue addAll(final AnyValue av) { if (av == null) return this; if (av instanceof DefaultAnyValue) { @@ -177,6 +190,13 @@ public abstract class AnyValue { return this; } + /** + * 合并两个AnyValue对象 会去重 + * + * @param av AnyValue + * + * @return DefaultAnyValue + */ public DefaultAnyValue setAll(final AnyValue av) { if (av == null) return this; if (av instanceof DefaultAnyValue) { @@ -401,8 +421,16 @@ public abstract class AnyValue { } + /** + * 字段名和值的组合对象 + * + * @param 泛型 + */ public static final class Entry { + /** + * 字段名 + */ public final String name; T value; @@ -413,10 +441,20 @@ public abstract class AnyValue { this.value = value0; } + /** + * 获取字段名 + * + * @return 字段名 + */ public String getName() { return name; } + /** + * 获取字段值 + * + * @return 字段值 + */ public T getValue() { return value; } @@ -506,34 +544,99 @@ public abstract class AnyValue { } } + /** + * 创建DefaultAnyValue + * + * @return DefaultAnyValue + */ public static DefaultAnyValue create() { return new DefaultAnyValue(); } + /** + * 文本内容转换成AnyValue对象 + * + * @param text 文本内容 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(String text) throws IOException { return new XmlReader(text).read(); } + /** + * 内容流转换成AnyValue对象 + * + * @param in 内容流 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(InputStream in) throws IOException { return loadFromXml(in, StandardCharsets.UTF_8); } + /** + * 内容流转换成AnyValue对象 + * + * @param in 内容流 + * @param charset 字符编码 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(InputStream in, Charset charset) throws IOException { return new XmlReader(Utility.read(in, charset)).read(); } + /** + * 内容流转换成AnyValue对象 + * + * @param text 文本内容 + * @param attrFunc 字段回调函数 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(String text, BiFunction attrFunc) throws IOException { return new XmlReader(text).attrFunc(attrFunc).read(); } + /** + * 内容流转换成AnyValue对象 + * + * @param in 内容流 + * @param attrFunc 字段回调函数 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(InputStream in, BiFunction attrFunc) throws IOException { return loadFromXml(in, StandardCharsets.UTF_8, attrFunc); } + /** + * 内容流转换成AnyValue对象 + * + * @param in 内容流 + * @param charset 字符编码 + * @param attrFunc 字段回调函数 + * + * @return AnyValue + * @throws IOException 异常 + */ public static AnyValue loadFromXml(InputStream in, Charset charset, BiFunction attrFunc) throws IOException { return new XmlReader(Utility.read(in, charset)).attrFunc(attrFunc).read(); } + /** + * 当前AnyValue对象字符串化 + * + * @param indent 缩进长度 + * + * @return String + */ public String toString(int indent) { //indent: 缩进长度 if (indent < 0) indent = 0; char[] chars = new char[indent]; @@ -551,43 +654,148 @@ public abstract class AnyValue { return sb.toString(); } + /** + * 回调子节点 + * + * @param stringConsumer 字符串字段的回调函数 + */ public abstract void forEach(BiConsumer stringConsumer); + /** + * 回调子节点 + * + * @param stringConsumer 字符串字段的回调函数 + * @param anyConsumer 字符串对象的回调函数 + */ public abstract void forEach(BiConsumer stringConsumer, BiConsumer anyConsumer); + /** + * 获取所有字符串子节点 + * + * @return Entry[] + */ public abstract Entry[] getStringEntrys(); + /** + * 获取所有复合子节点 + * + * @return Entry[] + */ public abstract Entry[] getAnyEntrys(); + /** + * 获取字段名集合 + * + * @return String[] + */ public abstract String[] getNames(); + /** + * 获取同级下同一字段名下所有的String对象 + * + * @param name 字段名 + * + * @return String[] + */ public abstract String[] getValues(String name); + /** + * 根据字段名集合获取String类型的字段值集合 + * + * @param names 字段名集合 + * + * @return String[] + */ public abstract String[] getValues(String... names); + /** + * 获取同级下同一字段名下所有的AnyValue对象 + * + * @param name 字段名 + * + * @return AnyValue[] + */ public abstract AnyValue[] getAnyValues(String name); + /** + * 根据字段名集合获取AnyValue类型的字段值集合 + * + * @param names 字段名集合 + * + * @return AnyValue[] + */ public abstract AnyValue[] getAnyValues(String... names); + /** + * 根据字段名获取AnyValue类型的字段值 + * + * @param name 字段名 + * + * @return AnyValue + */ public abstract AnyValue getAnyValue(String name); + /** + * 根据字段名获取String类型的字段值 + * + * @param name 字段名 + * + * @return String + */ public abstract String getValue(String name); + /** + * 根据字段名获取String类型的字段值 + * + * @param name 字段名 + * + * @return String + */ public abstract String get(String name); + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public boolean getBoolValue(String name) { return Boolean.parseBoolean(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public boolean getBoolValue(String name, boolean defaultValue) { String value = getValue(name); return value == null || value.length() == 0 ? defaultValue : Boolean.parseBoolean(value); } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public byte getByteValue(String name) { return Byte.parseByte(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public byte getByteValue(String name, byte defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -598,6 +806,15 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param radix 进制,默认十进制 + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public byte getByteValue(int radix, String name, byte defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -608,19 +825,49 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public char getCharValue(String name) { return getValue(name).charAt(0); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public char getCharValue(String name, char defaultValue) { String value = getValue(name); return value == null || value.length() == 0 ? defaultValue : value.charAt(0); } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return String + */ public short getShortValue(String name) { return Short.decode(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public short getShortValue(String name, short defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -631,6 +878,15 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param radix 进制,默认十进制 + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public short getShortValue(int radix, String name, short defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -641,10 +897,25 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public int getIntValue(String name) { return Integer.decode(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return String + */ public int getIntValue(String name, int defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -655,6 +926,15 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param radix 进制,默认十进制 + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public int getIntValue(int radix, String name, int defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -665,10 +945,25 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public long getLongValue(String name) { return Long.decode(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public long getLongValue(String name, long defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -679,6 +974,15 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param radix 进制,默认十进制 + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public long getLongValue(int radix, String name, long defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -689,10 +993,25 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return String + */ public float getFloatValue(String name) { return Float.parseFloat(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public float getFloatValue(String name, float defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -703,10 +1022,25 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * + * @return 字段值 + */ public double getDoubleValue(String name) { return Double.parseDouble(getValue(name)); } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public double getDoubleValue(String name, double defaultValue) { String value = getValue(name); if (value == null || value.length() == 0) return defaultValue; @@ -717,11 +1051,27 @@ public abstract class AnyValue { } } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public String getValue(String name, String defaultValue) { String value = getValue(name); return value == null ? defaultValue : value; } + /** + * 获取字段值 + * + * @param name 字段名 + * @param defaultValue 默认值 + * + * @return 字段值 + */ public String getOrDefault(String name, String defaultValue) { String value = getValue(name); return value == null ? defaultValue : value; @@ -753,10 +1103,27 @@ public abstract class AnyValue { return hash; } + /** + * xml化当前AnyValue对象 + * + * @param rootName root名称 + * + * @return String + */ public String toXML(String rootName) { return toXMLString(new StringBuilder("\r\n\r\n"), rootName, this, 0).toString(); } + /** + * xml化AnyValue对象 + * + * @param sb StringBuilder + * @param nodeName 字段名 + * @param conf AnyValue + * @param indent 缩进长度 + * + * @return StringBuilder + */ protected static StringBuilder toXMLString(StringBuilder sb, String nodeName, AnyValue conf, int indent) { //indent: 缩进长度 if (indent < 0) indent = 0; char[] chars = new char[indent]; diff --git a/src/main/java/org/redkale/util/Attribute.java b/src/main/java/org/redkale/util/Attribute.java index 4070e7965..8c3f9ab82 100644 --- a/src/main/java/org/redkale/util/Attribute.java +++ b/src/main/java/org/redkale/util/Attribute.java @@ -763,8 +763,13 @@ public interface Attribute { final String columnName = column.getName().replace('.', '/'); final String interDesc = Type.getDescriptor(TypeToken.typeToClass(subclass)); final String columnDesc = Type.getDescriptor(column); - final ClassLoader loader = Thread.currentThread().getContextClassLoader(); Class realclz = TypeToken.typeToClass(subclass); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + try { + loader.loadClass(realclz.getName()); + } catch (ClassNotFoundException e) { + loader = realclz.getClassLoader(); + } String pkgname = ""; String clzname = newsubname.toString(); if (realclz != null) { diff --git a/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java b/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java index 0fa08dcfe..574f0e372 100644 --- a/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java +++ b/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java @@ -177,6 +177,7 @@ public class MpscChunkedArrayQueue extends AbstractQueue { /** * An ordered store of an element to a given offset * + * @param E * @param buffer this.buffer * @param offset computed via * @param e an orderly kitty diff --git a/src/main/java/org/redkale/util/NonBlockingHashMap.java b/src/main/java/org/redkale/util/NonBlockingHashMap.java index a01fd147e..fc6dfc1d4 100644 --- a/src/main/java/org/redkale/util/NonBlockingHashMap.java +++ b/src/main/java/org/redkale/util/NonBlockingHashMap.java @@ -248,11 +248,13 @@ public class NonBlockingHashMap extends AbstractMap this(MIN_SIZE); } - /** Create a new NonBlockingHashMap with initial room for the given number of + /** * Create a new NonBlockingHashMap with initial room for the given number of * elements, thus avoiding internal resizing operations to reach an - * appropriate size. Large numbers here when used with a small count of + * appropriate size.Large numbers here when used with a small count of * elements will sacrifice space for a small amount of time gained. The - * initial size will be rounded up internally to the next larger power of 2. */ + * initial size will be rounded up internally to the next larger power of 2. + * @param initial_sz int + */ public NonBlockingHashMap(final int initial_sz) { initialize(initial_sz); } @@ -399,10 +401,13 @@ public class NonBlockingHashMap extends AbstractMap return res == TOMBSTONE ? null : res; } - /** Atomically replace newVal for oldVal, returning the value that existed - * there before. If the oldVal matches the returned value, then newVal was + /** * Atomically replace newVal for oldVal, returning the value that existed + * there before.If the oldVal matches the returned value, then newVal was * inserted, otherwise not. * + * @param key key + * @param newVal newVal + * @param oldVal oldVal * @return the previous value associated with the specified key, * or <tt>null</tt> if there was no mapping for the key * @throws NullPointerException if the key or either value is null @@ -544,7 +549,7 @@ public class NonBlockingHashMap extends AbstractMap } // --- get ----------------------------------------------------------------- - /** Returns the value to which the specified key is mapped, or {@code null} + /** * Returns the value to which the specified key is mapped, or {@code null} * if this map contains no mapping for the key. *

* More formally, if this map contains a mapping from a key {@code k} to @@ -552,6 +557,8 @@ public class NonBlockingHashMap extends AbstractMap * returns {@code v}; otherwise it returns {@code null}. (There can be at * most one such mapping.) * + * @param key key + * @return Type * @throws NullPointerException if the specified key is null */ // Never returns a Prime nor a Tombstone. @Override @@ -615,6 +622,8 @@ public class NonBlockingHashMap extends AbstractMap /** Returns the Key to which the specified key is mapped, or {@code null} * if this map contains no mapping for the key. * + * @param key TypeK + * @return TypeK * @throws NullPointerException if the specified key is null */ // Never returns a Prime nor a Tombstone. public TypeK getk(TypeK key) { @@ -1690,11 +1699,11 @@ public class NonBlockingHashMap extends AbstractMap // --- public interface --- /** - * Add the given value to current counter value. Concurrent updates will + * Add the given value to current counter value.Concurrent updates will * not be lost, but addAndGet or getAndAdd are not implemented because the - * total counter value (i.e., {@link #get}) is not atomically updated. - * Updates are striped across an array of counters to avoid cache contention + * total counter value (i.e., {@link #get}) is not atomically updated. Updates are striped across an array of counters to avoid cache contention * and has been tested with performance scaling linearly up to 768 CPUs. + * @param x long */ public void add(long x) { add_if(x); @@ -1710,8 +1719,8 @@ public class NonBlockingHashMap extends AbstractMap add_if(1L); } - /** Atomically set the sum of the striped counters to specified value. - * Rather more expensive than a simple store, in order to remain atomic. + /** * Atomically set the sum of the striped counters to specified value.Rather more expensive than a simple store, in order to remain atomic. + * @param x long */ public void set(long x) { CAT newcat = new CAT(null, 4, x); @@ -1721,27 +1730,30 @@ public class NonBlockingHashMap extends AbstractMap } /** - * Current value of the counter. Since other threads are updating furiously + * Current value of the counter.Since other threads are updating furiously * the value is only approximate, but it includes all counts made by the * current thread. Requires a pass over the internally striped counters. + * @return long */ public long get() { return _cat.sum(); } - /** Same as {@link #get}, included for completeness. */ + /** Same as {@link #get}, included for completeness. + * @return int */ public int intValue() { return (int) _cat.sum(); } - /** Same as {@link #get}, included for completeness. */ + /** Same as {@link #get}, included for completeness. + * @return long */ public long longValue() { return _cat.sum(); } /** - * A cheaper {@link #get}. Updated only once/millisecond, but as fast as a - * simple load instruction when not updating. + * A cheaper {@link #get}.Updated only once/millisecond, but as fast as a simple load instruction when not updating. + * @return long */ public long estimate_get() { return _cat.estimate_sum(); @@ -1749,7 +1761,9 @@ public class NonBlockingHashMap extends AbstractMap /** * Return the counter's {@code long} value converted to a string. + * @return String */ + @Override public String toString() { return _cat.toString(); } @@ -1763,8 +1777,8 @@ public class NonBlockingHashMap extends AbstractMap } /** - * Return the internal counter striping factor. Useful for diagnosing - * performance problems. + * Return the internal counter striping factor.Useful for diagnosing performance problems. + * @return int */ public int internal_size() { return _cat._t.length; diff --git a/src/main/java/org/redkale/util/Redkale.java b/src/main/java/org/redkale/util/Redkale.java index c2491ed06..d90ae4998 100644 --- a/src/main/java/org/redkale/util/Redkale.java +++ b/src/main/java/org/redkale/util/Redkale.java @@ -16,7 +16,7 @@ public final class Redkale { private static final String rootPackage = "org.redkale"; - private Redkale() { + private Redkale() { } public static String getRootPackage() { @@ -24,7 +24,7 @@ public final class Redkale { } public static String getDotedVersion() { - return "2.5.0"; + return "2.6.0"; } public static int getMajorVersion() { @@ -32,6 +32,6 @@ public final class Redkale { } public static int getMinorVersion() { - return 5; + return 6; } } diff --git a/src/test/java/org/redkale/test/convert/DyncJsonTest.java b/src/test/java/org/redkale/test/convert/DyncJsonTest.java new file mode 100644 index 000000000..cef6009d4 --- /dev/null +++ b/src/test/java/org/redkale/test/convert/DyncJsonTest.java @@ -0,0 +1,57 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.test.convert; + +import java.util.*; +import org.junit.jupiter.api.Test; +import org.redkale.convert.json.JsonConvert; + +/** + * + * @author zhangjx + */ +public class DyncJsonTest { + + public static void main(String[] args) throws Throwable { + new DyncJsonTest().run(); + } + + @Test + public void run() throws Exception { + SimpleDyncBean bean = new SimpleDyncBean(); + bean.name = "haha"; + System.out.println(JsonConvert.root().convertTo(bean)); + + SimpleDyncBean2 bean2 = new SimpleDyncBean2(); + bean2.name = "haha"; + + System.out.println(JsonConvert.root().convertTo(bean2)); + SimpleDyncBean3 bean3 = new SimpleDyncBean3(); + bean3.name = "haha"; + System.out.println(JsonConvert.root().convertTo(bean3)); + } + + public static class SimpleDyncBean { + + public String name; + + public List beans; + } + + public static class SimpleDyncBean2 { + + public String name; + + public SimpleDyncBean2 bean2; + } + + public static class SimpleDyncBean3 { + + public String name; + + public Map beanmap; + } +} diff --git a/src/test/java/org/redkale/test/ws/ChatService.java b/src/test/java/org/redkale/test/ws/ChatService.java index 45762f3ef..cfb1a1cb6 100644 --- a/src/test/java/org/redkale/test/ws/ChatService.java +++ b/src/test/java/org/redkale/test/ws/ChatService.java @@ -46,4 +46,10 @@ public class ChatService implements Service { public void chatMessage(ChatMessage message) { wsnode.broadcastMessage(message); } + + @Comment("其他操作") + public void other(int roomid, String name) { + System.out.println("其他操作: roomid: " + roomid + ", name: " + name); + } + } diff --git a/src/test/java/org/redkale/test/ws/ChatWebSocket.java b/src/test/java/org/redkale/test/ws/ChatWebSocket.java index 5d2037bc6..25ba228f4 100644 --- a/src/test/java/org/redkale/test/ws/ChatWebSocket.java +++ b/src/test/java/org/redkale/test/ws/ChatWebSocket.java @@ -91,4 +91,21 @@ public class ChatWebSocket extends WebSocket { service.joinRoom(getUserid(), roomid); } + /** + * 浏览器WebSocket请求: + *

+     * websocket.send(JSON.stringify({
+     *      roomid: 10212
+     *      name: "haha"
+     * }));
+     * 
+ * + * @param roomid 参数1 + * @param name 参数2 + */ + @RestOnMessage(name = "*") //*为特殊值表示参数中不包含方法名 + public void other(int roomid, String name) { + service.other(roomid, name); + } + } diff --git a/src/test/java/org/redkale/test/wsdync/_DyncChatWebSocketServlet.java b/src/test/java/org/redkale/test/wsdync/_DyncChatWebSocketServlet.java index b5017ae72..9e1ce6e2e 100644 --- a/src/test/java/org/redkale/test/wsdync/_DyncChatWebSocketServlet.java +++ b/src/test/java/org/redkale/test/wsdync/_DyncChatWebSocketServlet.java @@ -31,7 +31,7 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet { public _DyncChatWebSocketServlet() { super(); - this.messageTextType = _DyncChatWebSocketMessage.class; + this.messageRestType = _DyncChatWebSocketMessage.class; } @Override @@ -52,12 +52,48 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet { } } - public static class _DyncChatWebSocketMessage { + public static class _DyncChatWebSocketMessage implements WebSocketParam, Runnable { public _DyncChatWebSocketMessage_sendmessagee_00 sendmessage; public _DyncChatWebSocketMessage_joinroom_01 joinroom; + @ConvertDisabled + public _DyncChatWebSocket _redkale_websocket; + + public int roomid; + + public String name; + + @Override + public String[] getNames() { + return new String[]{"roomid", "name"}; + } + + @Override + public T getValue(String name) { + if ("roomid".equals(name)) return (T) (Integer) roomid; + if ("name".equals(name)) return (T) (String) name; + return null; + } + + @Override + public Annotation[] getAnnotations() { + Annotation[] annotations = _redkale_annotations.get("org/redkale/test/wsdync/_DyncChatWebSocketServlet$_DyncChatWebSocketMessage"); + if (annotations == null) return new Annotation[0]; + return Arrays.copyOf(annotations, annotations.length); + } + + public void execute(_DyncChatWebSocket websocket) { + this._redkale_websocket = websocket; + websocket.preOnMessage("*", this, this); + } + + @Override + public void run() { + _redkale_websocket.other(this.roomid, this.name); + } + @Override public String toString() { return JsonConvert.root().convertTo(this); @@ -163,6 +199,7 @@ public final class _DyncChatWebSocketServlet extends WebSocketServlet { message.joinroom.execute(websocket); return; } + message.execute(websocket); } }