From 8dbf4746623c4bd8026cfdc16843151abe952c04 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 4 May 2023 21:35:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/NodeServer.java | 10 +- .../org/redkale/mq/HttpMessageClient.java | 2 +- .../java/org/redkale/mq/MessageAgent.java | 118 ++++++++++-------- .../java/org/redkale/mq/MessageConsumer.java | 25 ++-- .../redkale/mq/MessageConsumerListener.java | 31 ----- .../org/redkale/mq/MessageMultiConsumer.java | 2 +- .../java/org/redkale/mq/MessageProducer.java | 25 ++-- .../org/redkale/mq/MessageProducerSender.java | 31 ----- .../java/org/redkale/mq/ResourceConsumer.java | 33 +++++ .../java/org/redkale/mq/ResourceProducer.java | 30 +++++ .../org/redkale/mq/SncpMessageClient.java | 2 +- src/main/java/org/redkale/util/Utility.java | 24 ++++ 12 files changed, 189 insertions(+), 144 deletions(-) delete mode 100644 src/main/java/org/redkale/mq/MessageConsumerListener.java delete mode 100644 src/main/java/org/redkale/mq/MessageProducerSender.java create mode 100644 src/main/java/org/redkale/mq/ResourceConsumer.java create mode 100644 src/main/java/org/redkale/mq/ResourceProducer.java diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 697d71e62..e569bc581 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -640,8 +640,8 @@ public abstract class NodeServer { } protected boolean acceptsComponent(Class serviceImplClass) { - if (MessageConsumerListener.class.isAssignableFrom(serviceImplClass)) { - MessageConsumer mqConsumer = serviceImplClass.getAnnotation(MessageConsumer.class); + if (MessageConsumer.class.isAssignableFrom(serviceImplClass)) { + ResourceConsumer mqConsumer = serviceImplClass.getAnnotation(ResourceConsumer.class); if (mqConsumer == null) { return false; } @@ -654,10 +654,10 @@ public abstract class NodeServer { } protected boolean interceptComponent(Service service) { - if (service instanceof MessageConsumerListener) { - MessageConsumer mqConsumer = service.getClass().getAnnotation(MessageConsumer.class); + if (service instanceof MessageConsumer) { + ResourceConsumer mqConsumer = service.getClass().getAnnotation(ResourceConsumer.class); MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq()); - mqAgent.addConsumerListener((MessageConsumerListener) service); + mqAgent.addMessageConsumer(mqConsumer, (MessageConsumer) service); return true; } return false; diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 5cab40740..f04a0c6dd 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -32,7 +32,7 @@ public class HttpMessageClient extends MessageClient { protected HttpMessageClient(MessageAgent messageAgent) { super(messageAgent); if (messageAgent != null) { // //RPC方式下无messageAgent - this.respTopic = messageAgent.generateHttpRespTopic(); + this.respTopic = messageAgent.generateApplicationHttpRespTopic(); } } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 514c0566d..7b5d54ecc 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -14,6 +14,7 @@ import java.util.stream.Collectors; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; +import static org.redkale.boot.Application.RESNAME_APP_NAME; import static org.redkale.boot.Application.RESNAME_APP_NODEID; import org.redkale.boot.*; import org.redkale.net.Servlet; @@ -38,34 +39,42 @@ public abstract class MessageAgent implements Resourcable { @Resource(required = false) protected Application application; - + @Resource(name = RESNAME_APP_NODEID) protected int nodeid; + @Resource(name = RESNAME_APP_NAME) + protected String nodeName; + protected String name; protected AnyValue config; + protected MessageProducer messageProducer; + + //key: group, sub-key: topic + protected final ConcurrentHashMap> consumerMap = new ConcurrentHashMap<>(); + + protected final CopyOnWriteArrayList consumerList = new CopyOnWriteArrayList<>(); + protected MessageClientProducer httpProducer; protected MessageClientProducer sncpProducer; - protected final ReentrantLock httpProducerLock = new ReentrantLock(); - - protected final ReentrantLock sncpProducerLock = new ReentrantLock(); - - protected final ReentrantLock httpNodesLock = new ReentrantLock(); - - protected final ReentrantLock sncpNodesLock = new ReentrantLock(); - - protected final List consumerListeners = new CopyOnWriteArrayList<>(); - - protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); - protected HttpMessageClient httpMessageClient; protected SncpMessageClient sncpMessageClient; + protected final ReentrantLock consumerLock = new ReentrantLock(); + + protected final ReentrantLock producerLock = new ReentrantLock(); + + protected final ReentrantLock nodesLock = new ReentrantLock(); + + protected final List consumerListeners = new CopyOnWriteArrayList<>(); + + protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); + protected ScheduledThreadPoolExecutor timeoutExecutor; protected MessageCoder messageCoder = MessageRecordCoder.getInstance(); @@ -226,10 +235,10 @@ public abstract class MessageAgent implements Resourcable { //获取指定topic的生产处理器 public MessageClientProducer getSncpMessageClientProducer() { if (this.sncpProducer == null) { - sncpProducerLock.lock(); + producerLock.lock(); try { if (this.sncpProducer == null) { - long s = System.currentTimeMillis(); + long s = System.currentTimeMillis(); this.sncpProducer = createMessageClientProducer("SncpProducer"); long e = System.currentTimeMillis() - s; if (logger.isLoggable(Level.FINEST)) { @@ -237,7 +246,7 @@ public abstract class MessageAgent implements Resourcable { } } } finally { - sncpProducerLock.unlock(); + producerLock.unlock(); } } return this.sncpProducer; @@ -245,7 +254,7 @@ public abstract class MessageAgent implements Resourcable { public MessageClientProducer getHttpMessageClientProducer() { if (this.httpProducer == null) { - httpProducerLock.lock(); + producerLock.lock(); try { if (this.httpProducer == null) { long s = System.currentTimeMillis(); @@ -253,10 +262,10 @@ public abstract class MessageAgent implements Resourcable { long e = System.currentTimeMillis() - s; if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); - } + } } } finally { - httpProducerLock.unlock(); + producerLock.unlock(); } } return this.httpProducer; @@ -283,8 +292,24 @@ public abstract class MessageAgent implements Resourcable { @ResourceListener public abstract void onResourceChange(ResourceEvent[] events); - public void addConsumerListener(MessageConsumerListener listener) { - + public void addMessageConsumer(ResourceConsumer res, MessageConsumer consumer) { + consumerLock.lock(); + try { + ConcurrentHashMap map = consumerMap.computeIfAbsent(res.group(), g -> new ConcurrentHashMap<>()); + for (String topic : res.topics()) { + if (!topic.trim().isEmpty()) { + if (map.containsKey(topic.trim())) { + throw new RedkaleException(MessageConsumer.class.getSimpleName() + + " consume topic (" + topic + ") repeat with " + + map.get(topic).getClass().getName() + " and " + consumer.getClass().getName()); + } + map.put(topic.trim(), consumer); + } + } + consumerList.add(consumer); + } finally { + consumerLock.unlock(); + } } public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { @@ -304,7 +329,7 @@ public abstract class MessageAgent implements Resourcable { } String[] topics = generateHttpReqTopics(service); String consumerid = generateHttpConsumerid(topics, service); - httpNodesLock.lock(); + nodesLock.lock(); try { if (clientConsumerNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); @@ -312,7 +337,7 @@ public abstract class MessageAgent implements Resourcable { HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor))); } finally { - httpNodesLock.unlock(); + nodesLock.unlock(); } } @@ -327,7 +352,7 @@ public abstract class MessageAgent implements Resourcable { } String topic = generateSncpReqTopic(service); String consumerid = generateSncpConsumerid(topic, service); - sncpNodesLock.lock(); + nodesLock.lock(); try { if (clientConsumerNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); @@ -335,49 +360,49 @@ public abstract class MessageAgent implements Resourcable { SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor))); } finally { - sncpNodesLock.unlock(); + nodesLock.unlock(); } } - //格式: sncp.req.user + //格式: sncp.req.module.user public final String generateSncpReqTopic(Service service) { return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service)); } - //格式: sncp.req.user + //格式: sncp.req.module.user public final String generateSncpReqTopic(String resourceName, Class resourceType) { if (WebSocketNode.class.isAssignableFrom(resourceType)) { - return "sncp.req.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid; + return "sncp.req.module.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid; } - return "sncp.req." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName)); + return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName)); } - //格式: consumer-sncp.req.user 不提供外部使用 + //格式: consumer-sncp.req.module.user 不提供外部使用 protected final String generateSncpConsumerid(String topic, Service service) { return "consumer-" + topic; } - //格式: http.req.user + //格式: http.req.module.user public static String generateHttpReqTopic(String module) { - return "http.req." + module.toLowerCase(); + return "http.req.module." + module.toLowerCase(); } - //格式: http.req.user + //格式: http.req.module.user public static String generateHttpReqTopic(String module, String resname) { - return "http.req." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); + return "http.req.module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname)); } - //格式: sncp.resp.node10 - protected String generateSncpRespTopic() { - return "sncp.resp.node" + nodeid; + //格式: sncp.resp.app.node10 + protected String generateApplicationSncpRespTopic() { + return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } - //格式: http.resp.node10 - protected String generateHttpRespTopic() { - return "http.resp.node" + nodeid; + //格式: http.resp.app.node10 + protected String generateApplicationHttpRespTopic() { + return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } - //格式: http.req.user + //格式: http.req.module.user protected String[] generateHttpReqTopics(Service service) { String resname = Sncp.getResourceName(service); String module = Rest.getRestModule(service).toLowerCase(); @@ -385,22 +410,17 @@ public abstract class MessageAgent implements Resourcable { if (mmc != null) { return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))}; } - return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))}; + return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))}; } - //格式: consumer-http.req.user + //格式: consumer-http.req.module.user protected String generateHttpConsumerid(String[] topics, Service service) { String resname = Sncp.getResourceName(service); String key = Rest.getRestModule(service).toLowerCase(); - return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname)); + return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname)); } - //格式: xxxx.resp.node10 - protected String generateRespTopic(String protocol) { - return protocol + ".resp.node" + nodeid; - } - protected static class MessageClientConsumerNode { public final NodeServer server; diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index 93c7b00ed..ebe1c5f1d 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -3,10 +3,9 @@ */ package org.redkale.mq; -import static java.lang.annotation.ElementType.TYPE; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; -import org.redkale.convert.ConvertType; +import org.redkale.annotation.Component; +import org.redkale.service.Local; +import org.redkale.util.AnyValue; /** * MQ资源注解 @@ -15,19 +14,19 @@ import org.redkale.convert.ConvertType; * 详情见: https://redkale.org * * @author zhangjx + * @param 泛型 * * @since 2.8.0 */ -@Documented -@Target({TYPE}) -@Retention(RUNTIME) -public @interface MessageConsumer { +@Local +@Component +public interface MessageConsumer { - String mq(); + default void init(AnyValue config) { + } - String group() default ""; + public void onMessage(String topic, T message); - String[] topics(); - - ConvertType convertType() default ConvertType.JSON; + default void destroy(AnyValue config) { + } } diff --git a/src/main/java/org/redkale/mq/MessageConsumerListener.java b/src/main/java/org/redkale/mq/MessageConsumerListener.java deleted file mode 100644 index 3fb09d8a1..000000000 --- a/src/main/java/org/redkale/mq/MessageConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - */ -package org.redkale.mq; - -import org.redkale.annotation.Component; -import org.redkale.service.Local; -import org.redkale.util.AnyValue; - -/** - * MQ资源注解 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.8.0 - */ -@Local -@Component -public interface MessageConsumerListener { - - default void init(AnyValue config) { - } - - public void onMessage(String topic, T message); - - default void destroy(AnyValue config) { - } -} diff --git a/src/main/java/org/redkale/mq/MessageMultiConsumer.java b/src/main/java/org/redkale/mq/MessageMultiConsumer.java index 05c8888ee..d292c4a9e 100644 --- a/src/main/java/org/redkale/mq/MessageMultiConsumer.java +++ b/src/main/java/org/redkale/mq/MessageMultiConsumer.java @@ -42,7 +42,7 @@ import static java.lang.annotation.ElementType.*; * *

* 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型
- * 由 MessageConsumerListener 代替 + * 由 MessageConsumer 代替 *

* 详情见: https://redkale.org * diff --git a/src/main/java/org/redkale/mq/MessageProducer.java b/src/main/java/org/redkale/mq/MessageProducer.java index 7b8ee5627..790880278 100644 --- a/src/main/java/org/redkale/mq/MessageProducer.java +++ b/src/main/java/org/redkale/mq/MessageProducer.java @@ -3,13 +3,12 @@ */ package org.redkale.mq; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import java.lang.annotation.*; -import org.redkale.convert.ConvertType; +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import org.redkale.convert.Convert; /** - * MQ资源注解, 只能标记在MessageProducerSender类型字段上 + * MQ消息发送器 * *

* 详情见: https://redkale.org @@ -18,13 +17,15 @@ import org.redkale.convert.ConvertType; * * @since 2.8.0 */ -@Documented -@Target({FIELD}) -@Retention(RUNTIME) -public @interface MessageProducer { +public interface MessageProducer { - String mq(); + public CompletableFuture sendMessage(String topic, Object value); - ConvertType convertType() default ConvertType.JSON; - + default CompletableFuture sendMessage(String topic, Convert convert, Object value) { + return sendMessage(topic, convert.convertToBytes(value)); + } + + default CompletableFuture sendMessage(String topic, Convert convert, Type type, Object value) { + return sendMessage(topic, convert.convertToBytes(type, value)); + } } diff --git a/src/main/java/org/redkale/mq/MessageProducerSender.java b/src/main/java/org/redkale/mq/MessageProducerSender.java deleted file mode 100644 index f5679b281..000000000 --- a/src/main/java/org/redkale/mq/MessageProducerSender.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - */ -package org.redkale.mq; - -import java.lang.reflect.Type; -import java.util.concurrent.CompletableFuture; -import org.redkale.convert.Convert; - -/** - * MQ消息发送器 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.8.0 - */ -public interface MessageProducerSender { - - public CompletableFuture sendMessage(String topic, Object value); - - default CompletableFuture sendMessage(String topic, Convert convert, Object value) { - return sendMessage(topic, convert.convertToBytes(value)); - } - - default CompletableFuture sendMessage(String topic, Convert convert, Type type, Object value) { - return sendMessage(topic, convert.convertToBytes(type, value)); - } -} diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java new file mode 100644 index 000000000..e702ca375 --- /dev/null +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -0,0 +1,33 @@ +/* + * + */ +package org.redkale.mq; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; +import org.redkale.convert.ConvertType; + +/** + * MQ资源注解, 只能标记在MessageConsumer子类上 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface ResourceConsumer { + + String mq(); + + String group() default ""; + + String[] topics(); + + ConvertType convertType() default ConvertType.JSON; +} diff --git a/src/main/java/org/redkale/mq/ResourceProducer.java b/src/main/java/org/redkale/mq/ResourceProducer.java new file mode 100644 index 000000000..d6c207b86 --- /dev/null +++ b/src/main/java/org/redkale/mq/ResourceProducer.java @@ -0,0 +1,30 @@ +/* + * + */ +package org.redkale.mq; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.*; +import org.redkale.convert.ConvertType; + +/** + * MQ资源注解, 只能标记在MessageProducer类型字段上 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +@Documented +@Target({FIELD}) +@Retention(RUNTIME) +public @interface ResourceProducer { + + String mq(); + + ConvertType convertType() default ConvertType.JSON; + +} diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java index ce4b07deb..25f11b044 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ b/src/main/java/org/redkale/mq/SncpMessageClient.java @@ -20,7 +20,7 @@ public class SncpMessageClient extends MessageClient { protected SncpMessageClient(MessageAgent messageAgent) { super(messageAgent); - this.respTopic = messageAgent.generateSncpRespTopic(); + this.respTopic = messageAgent.generateApplicationSncpRespTopic(); } @Override diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 8aeffaa44..09789470c 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -473,6 +473,30 @@ public final class Utility { }); } + /** + * 是否为空 + * + * @param str 字符串 + * + * @return 是否为空 + * + */ + public static boolean isEmpty(CharSequence str) { + return str == null || str.length() == 0; + } + + /** + * 是否不为空 + * + * @param str 字符串 + * + * @return 是否不为空 + * + */ + public static boolean isNotEmpty(CharSequence str) { + return str != null && str.length() > 0; + } + /** * 将字符串首字母大写 *