From eb5142a12515448b230658b091b236a8b500e282 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 1 May 2023 09:30:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96NodeServer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 4 +- .../java/org/redkale/boot/ClassFilter.java | 15 ++++-- .../java/org/redkale/boot/NodeHttpServer.java | 35 ++++++++++---- .../java/org/redkale/boot/NodeServer.java | 48 ++++++++++++------- .../java/org/redkale/boot/NodeSncpServer.java | 4 +- .../org/redkale/boot/NodeWatchServer.java | 7 +-- .../redkale/cluster/CacheClusterAgent.java | 4 +- .../org/redkale/cluster/ClusterAgent.java | 2 +- .../java/org/redkale/mq/MessageAgent.java | 9 ++-- .../java/org/redkale/mq/MessageConsumer.java | 3 ++ .../redkale/mq/MessageConsumerListener.java | 11 ++++- .../org/redkale/mq/MessageMultiConsumer.java | 8 ++-- .../java/org/redkale/mq/MessageProducer.java | 4 ++ .../org/redkale/mq/MessageProducerSender.java | 10 ++-- 14 files changed, 112 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 38114a88b..6d3fabb76 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -1119,7 +1119,7 @@ public final class Application { loadCacheSource(sourceName, false); } this.resourceFactory.inject(clusterAgent); - clusterAgent.init(this.resourceFactory, clusterAgent.getConfig()); + clusterAgent.init(clusterAgent.getConfig()); this.resourceFactory.register(ClusterAgent.class, clusterAgent); logger.info("ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") init in " + (System.currentTimeMillis() - s) + " ms"); } @@ -1130,7 +1130,7 @@ public final class Application { long s = System.currentTimeMillis(); for (MessageAgent agent : this.messageAgents) { this.resourceFactory.inject(agent); - agent.init(this.resourceFactory, agent.getConfig()); + agent.init(agent.getConfig()); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); //this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); //不需要给开发者使用 diff --git a/src/main/java/org/redkale/boot/ClassFilter.java b/src/main/java/org/redkale/boot/ClassFilter.java index 0ac445162..8f313dfa8 100644 --- a/src/main/java/org/redkale/boot/ClassFilter.java +++ b/src/main/java/org/redkale/boot/ClassFilter.java @@ -607,6 +607,12 @@ public final class ClassFilter { if (classname.startsWith("com.mysql.")) { break; } + if (classname.startsWith("org.junit.")) { + break; + } + if (classname.startsWith("org.openjfx.")) { + break; + } if (classname.startsWith("org.mariadb.")) { break; } @@ -701,11 +707,10 @@ public final class ClassFilter { return; } File[] lfs = root.listFiles(); - if (lfs == null) { - throw new RedkaleException("File(" + root + ") cannot listFiles()"); - } - for (File f : lfs) { - loadClassFiles(exclude, f, files); + if (lfs != null) { + for (File f : lfs) { + loadClassFiles(exclude, f, files); + } } } } diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index ec541df36..16a2a731e 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -41,10 +41,13 @@ public class NodeHttpServer extends NodeServer { protected final HttpServer httpServer; + protected ClassFilter webSocketFilter; + public NodeHttpServer(Application application, AnyValue serconf) { super(application, createServer(application, serconf)); this.httpServer = (HttpServer) server; this.rest = serconf == null ? false : serconf.getAnyValue("rest") != null; + } private static Server createServer(Application application, AnyValue serconf) { @@ -79,18 +82,33 @@ public class NodeHttpServer extends NodeServer { } @Override - protected ClassFilter createOtherClassFilter() { - return createClassFilter(null, RestWebSocket.class, WebSocket.class, null, null, "rest", "websocket"); + protected List createOtherClassFilters() { + this.webSocketFilter = createClassFilter(null, RestWebSocket.class, WebSocket.class, null, null, "rest", "websocket"); + List filters = super.createOtherClassFilters(); + if (filters == null) { + filters = new ArrayList<>(); + } + filters.add(webSocketFilter); + return filters; } @Override - protected void loadService(ClassFilter serviceFilter, ClassFilter otherFilter) throws Exception { - super.loadService(serviceFilter, otherFilter); + protected void loadOthers(List otherFilters) throws Exception { + List filters = otherFilters; + if (filters != null) { + filters.remove(this.webSocketFilter); //webSocketFilter会在loadHttpFilter中处理,先剔除 + } + super.loadOthers(filters); + } + + @Override + protected void loadService(ClassFilter serviceFilter) throws Exception { + super.loadService(serviceFilter); initWebSocketService(); } @Override - protected void loadFilter(ClassFilter filterFilter, ClassFilter otherFilter) throws Exception { + protected void loadFilter(ClassFilter filterFilter) throws Exception { if (httpServer != null) { loadHttpFilter(filterFilter); } @@ -98,9 +116,9 @@ public class NodeHttpServer extends NodeServer { @Override @SuppressWarnings("unchecked") - protected void loadServlet(ClassFilter servletFilter, ClassFilter otherFilter) throws Exception { + protected void loadServlet(ClassFilter servletFilter) throws Exception { if (httpServer != null) { - loadHttpServlet(servletFilter, otherFilter); + loadHttpServlet(servletFilter); } } @@ -192,7 +210,7 @@ public class NodeHttpServer extends NodeServer { } @SuppressWarnings("unchecked") - protected void loadHttpServlet(final ClassFilter servletFilter, ClassFilter webSocketFilter) throws Exception { + protected void loadHttpServlet(final ClassFilter servletFilter) throws Exception { RedkaleClassLoader.putReflectionPublicClasses(HttpServlet.class.getName()); RedkaleClassLoader.putReflectionPublicClasses(HttpDispatcherServlet.class.getName()); RedkaleClassLoader.putReflectionDeclaredConstructors(HttpResourceServlet.class, HttpResourceServlet.class.getName()); @@ -258,6 +276,7 @@ public class NodeHttpServer extends NodeServer { for (AnyValue restConf : serverConf.getAnyValues("rest")) { loadRestServlet(webSocketFilter, restConf, restedObjects, restedLock, sb, rests, webss); } + this.webSocketFilter = null; } int max = 0; if (ss != null && sb != null) { diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 0d69f4699..697d71e62 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -197,15 +197,29 @@ public abstract class NodeServer { } ClassFilter filterFilter = createFilterClassFilter(); ClassFilter servletFilter = createServletClassFilter(); - ClassFilter otherFilter = createOtherClassFilter(); + List otherFilters = createOtherClassFilters(); + List filters = new ArrayList<>(); + if (serviceFilter != null) { + filters.add(serviceFilter); + } + if (filterFilter != null) { + filters.add(filterFilter); + } + if (servletFilter != null) { + filters.add(servletFilter); + } + if (otherFilters != null) { + filters.addAll(otherFilters); + } long s = System.currentTimeMillis(); - ClassFilter.Loader.load(application.getHome(), serverClassLoader, ((application.excludelibs != null ? (application.excludelibs + ";") : "") + serverConf.getValue("excludelibs", "")).split(";"), serviceFilter, filterFilter, servletFilter, otherFilter); + ClassFilter.Loader.load(application.getHome(), serverClassLoader, ((application.excludelibs != null ? (application.excludelibs + ";") : "") + serverConf.getValue("excludelibs", "")).split(";"), filters.toArray(new ClassFilter[filters.size()])); long e = System.currentTimeMillis() - s; logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms"); - loadService(serviceFilter, otherFilter); //必须在servlet之前 + loadService(serviceFilter); //必须在servlet之前 + loadOthers(otherFilters); if (!application.isSingletonMode()) { //非singleton模式下才加载Filter、Servlet - loadFilter(filterFilter, otherFilter); - loadServlet(servletFilter, otherFilter); + loadFilter(filterFilter); + loadServlet(servletFilter); postLoadServlets(); } if (this.interceptor != null) { @@ -213,15 +227,17 @@ public abstract class NodeServer { } } - protected abstract void loadFilter(ClassFilter filterFilter, ClassFilter otherFilter) throws Exception; + protected void loadOthers(List otherFilters) throws Exception { + } - protected abstract void loadServlet(ClassFilter servletFilter, ClassFilter otherFilter) throws Exception; + protected abstract void loadFilter(ClassFilter filterFilter) throws Exception; + + protected abstract void loadServlet(ClassFilter servletFilter) throws Exception; private void initResource() { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory appResFactory = application.getResourceFactory(); - final String confURI = appResFactory.find(RESNAME_APP_CONF_DIR, String.class); //------------------------------------- 注册 Resource -------------------------------------------------------- resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { try { @@ -408,7 +424,7 @@ public abstract class NodeServer { } @SuppressWarnings("unchecked") - protected void loadService(ClassFilter serviceFilter, ClassFilter otherFilter) throws Exception { + protected void loadService(ClassFilter serviceFilter) throws Exception { if (serviceFilter == null) { return; } @@ -446,11 +462,11 @@ public abstract class NodeServer { if (!entry.isEmptyGroup() && !entry.isRemote() && rpcGroups.containsGroup(entry.getGroup())) { throw new RedkaleException("Not found group(" + entry.getGroup() + ")"); } - Service oldother = resourceFactory.find(entry.getName(), serviceImplClass); - if (oldother != null) { //Server加载Service时需要判断是否已经加载过了。 - if (!Sncp.isRemote(oldother)) { - if (!Sncp.isComponent(oldother)) { - servletServices.add(oldother); + Service oldOther = resourceFactory.find(entry.getName(), serviceImplClass); + if (oldOther != null) { //Server加载Service时需要判断是否已在其他协议服务中加载 + if (!Sncp.isRemote(oldOther)) { + if (!Sncp.isComponent(oldOther)) { + servletServices.add(oldOther); } } continue; @@ -619,7 +635,7 @@ public abstract class NodeServer { private void calcMaxLength(Service y) { //计算toString中的长度 String n = Sncp.getResourceName(y); - maxNameLength = Math.max(maxNameLength, n == null ? 0 : n.length()); + maxNameLength = Math.max(maxNameLength, n == null ? 0 : n.length()); maxTypeLength = Math.max(maxTypeLength, Sncp.getResourceType(y).getName().length() + 1); } @@ -705,7 +721,7 @@ public abstract class NodeServer { protected abstract ClassFilter createServletClassFilter(); - protected ClassFilter createOtherClassFilter() { + protected List createOtherClassFilters() { return null; } diff --git a/src/main/java/org/redkale/boot/NodeSncpServer.java b/src/main/java/org/redkale/boot/NodeSncpServer.java index 9859ae806..c0d4a20d9 100644 --- a/src/main/java/org/redkale/boot/NodeSncpServer.java +++ b/src/main/java/org/redkale/boot/NodeSncpServer.java @@ -102,7 +102,7 @@ public class NodeSncpServer extends NodeServer { } @Override - protected void loadFilter(ClassFilter filterFilter, ClassFilter otherFilter) throws Exception { + protected void loadFilter(ClassFilter filterFilter) throws Exception { if (sncpServer != null) { loadSncpFilter(this.serverConf.getAnyValue("fliters"), filterFilter); } @@ -132,7 +132,7 @@ public class NodeSncpServer extends NodeServer { } @Override - protected void loadServlet(ClassFilter servletFilter, ClassFilter otherFilter) throws Exception { + protected void loadServlet(ClassFilter servletFilter) throws Exception { RedkaleClassLoader.putReflectionPublicClasses(SncpServlet.class.getName()); if (!application.isSingletonMode()) { this.servletServices.stream() diff --git a/src/main/java/org/redkale/boot/NodeWatchServer.java b/src/main/java/org/redkale/boot/NodeWatchServer.java index b7ea1f695..7bddaabfb 100644 --- a/src/main/java/org/redkale/boot/NodeWatchServer.java +++ b/src/main/java/org/redkale/boot/NodeWatchServer.java @@ -6,8 +6,9 @@ package org.redkale.boot; import java.lang.annotation.Annotation; +import java.util.List; import org.redkale.net.*; -import org.redkale.net.http.*; +import org.redkale.net.http.WebServlet; import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.watch.*; @@ -42,8 +43,8 @@ public class NodeWatchServer extends NodeHttpServer { } @Override - protected ClassFilter createOtherClassFilter() { - return null; + protected List createOtherClassFilters() { + return null; //不调用 super.createOtherClassFilters() } @Override diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 7bc84230c..2d319c3be 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -51,8 +51,8 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { protected final ConcurrentHashMap> mqtpAddressMap = new ConcurrentHashMap<>(); @Override - public void init(ResourceFactory factory, AnyValue config) { - super.init(factory, config); + public void init(AnyValue config) { + super.init(config); this.sourceName = getSourceName(); this.ttls = config.getIntValue("ttls", 10); if (this.ttls < 5) { diff --git a/src/main/java/org/redkale/cluster/ClusterAgent.java b/src/main/java/org/redkale/cluster/ClusterAgent.java index abde9ed5b..9dd12424f 100644 --- a/src/main/java/org/redkale/cluster/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/ClusterAgent.java @@ -67,7 +67,7 @@ public abstract class ClusterAgent { protected final ConcurrentHashMap remoteEntrys = new ConcurrentHashMap<>(); - public void init(ResourceFactory factory, AnyValue config) { + public void init(AnyValue config) { this.config = config; this.name = config.getValue("name", ""); this.waits = config.getBoolValue("waits", false); diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index c0afca85f..a388de23a 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -36,6 +36,9 @@ public abstract class MessageAgent implements Resourcable { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + @Resource(required = false) + protected Application application; + @Resource(name = RESNAME_APP_NODEID) protected int nodeid; @@ -72,7 +75,7 @@ public abstract class MessageAgent implements Resourcable { //本地Service消息接收处理器, key:consumerid protected HashMap clientConsumerNodes = new LinkedHashMap<>(); - public void init(ResourceFactory factory, AnyValue config) { + public void init(AnyValue config) { this.name = checkName(config.getValue("name", "")); this.httpMessageClient = new HttpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this); @@ -83,8 +86,8 @@ public abstract class MessageAgent implements Resourcable { Class> coderClass = (Class) Thread.currentThread().getContextClassLoader().loadClass(coderType); RedkaleClassLoader.putReflectionPublicConstructors(coderClass, coderClass.getName()); MessageCoder coder = coderClass.getConstructor().newInstance(); - if (factory != null) { - factory.inject(coder); + if (application != null) { + application.getResourceFactory().inject(coder); } if (coder instanceof Service) { ((Service) coder).init(config); diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index 6973335df..93c7b00ed 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -6,6 +6,7 @@ package org.redkale.mq; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; import java.lang.annotation.*; +import org.redkale.convert.ConvertType; /** * MQ资源注解 @@ -27,4 +28,6 @@ public @interface MessageConsumer { String group() default ""; String[] topics(); + + ConvertType convertType() default ConvertType.JSON; } diff --git a/src/main/java/org/redkale/mq/MessageConsumerListener.java b/src/main/java/org/redkale/mq/MessageConsumerListener.java index a3379dd0e..3fb09d8a1 100644 --- a/src/main/java/org/redkale/mq/MessageConsumerListener.java +++ b/src/main/java/org/redkale/mq/MessageConsumerListener.java @@ -4,7 +4,8 @@ package org.redkale.mq; import org.redkale.annotation.Component; -import org.redkale.service.*; +import org.redkale.service.Local; +import org.redkale.util.AnyValue; /** * MQ资源注解 @@ -18,7 +19,13 @@ import org.redkale.service.*; */ @Local @Component -public interface MessageConsumerListener extends Service { +public interface MessageConsumerListener { + + default void init(AnyValue config) { + } public void onMessage(String topic, T message); + + default void destroy(AnyValue config) { + } } diff --git a/src/main/java/org/redkale/mq/MessageMultiConsumer.java b/src/main/java/org/redkale/mq/MessageMultiConsumer.java index 51a825a56..05c8888ee 100644 --- a/src/main/java/org/redkale/mq/MessageMultiConsumer.java +++ b/src/main/java/org/redkale/mq/MessageMultiConsumer.java @@ -5,9 +5,9 @@ */ package org.redkale.mq; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import java.lang.annotation.*; import static java.lang.annotation.ElementType.*; -import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * 多消费组,需要同 @RestService 一起使用 @@ -41,13 +41,14 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; * * *

- * 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型 - * + * 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型
+ * 由 MessageConsumerListener 代替 *

* 详情见: https://redkale.org * * * @author zhangjx + * @deprecated * * @since 2.1.0 */ @@ -55,6 +56,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; @Documented @Target({TYPE}) @Retention(RUNTIME) +@Deprecated(since = "2.8.0") public @interface MessageMultiConsumer { String module(); diff --git a/src/main/java/org/redkale/mq/MessageProducer.java b/src/main/java/org/redkale/mq/MessageProducer.java index 3c55ec88f..7b8ee5627 100644 --- a/src/main/java/org/redkale/mq/MessageProducer.java +++ b/src/main/java/org/redkale/mq/MessageProducer.java @@ -6,6 +6,7 @@ package org.redkale.mq; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.RetentionPolicy.RUNTIME; import java.lang.annotation.*; +import org.redkale.convert.ConvertType; /** * MQ资源注解, 只能标记在MessageProducerSender类型字段上 @@ -23,4 +24,7 @@ import java.lang.annotation.*; public @interface MessageProducer { String mq(); + + ConvertType convertType() default ConvertType.JSON; + } diff --git a/src/main/java/org/redkale/mq/MessageProducerSender.java b/src/main/java/org/redkale/mq/MessageProducerSender.java index 4eaa23849..f5679b281 100644 --- a/src/main/java/org/redkale/mq/MessageProducerSender.java +++ b/src/main/java/org/redkale/mq/MessageProducerSender.java @@ -19,13 +19,13 @@ import org.redkale.convert.Convert; */ public interface MessageProducerSender { - public CompletableFuture send(String topic, Object value); + public CompletableFuture sendMessage(String topic, Object value); - default CompletableFuture send(String topic, Convert convert, Object value) { - return send(topic, convert.convertToBytes(value)); + default CompletableFuture sendMessage(String topic, Convert convert, Object value) { + return sendMessage(topic, convert.convertToBytes(value)); } - default CompletableFuture send(String topic, Convert convert, Type type, Object value) { - return send(topic, convert.convertToBytes(type, value)); + default CompletableFuture sendMessage(String topic, Convert convert, Type type, Object value) { + return sendMessage(topic, convert.convertToBytes(type, value)); } }