From 3158f9265f410998dfd4e4f6fb95bf676a7d3d6a Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 13 Aug 2024 19:09:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=87=E6=A1=A3=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/agent-message.md | 98 ++++++++++++++++++- .../java/org/redkale/mq/MessageManager.java | 2 +- .../redkale/test/mq/TestMessageManager.java | 35 +++++++ 3 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/redkale/test/mq/TestMessageManager.java diff --git a/docs/agent-message.md b/docs/agent-message.md index 661f148f0..4f74f7b97 100644 --- a/docs/agent-message.md +++ b/docs/agent-message.md @@ -1,2 +1,98 @@ # 消息中心 -文档完善中…… \ No newline at end of file +   MessageAgent是消息中心抽象接口。 +## 配置 +```xml + + + + +``` + +## 消费消息 +   通过```@ResourceConsumer```和```MessageConsumer```接口实现消费 +```java +@ResourceConsumer(mq = "mymq", topics = "test_bean_topic") +public class TestMessageConsumer implements MessageConsumer { + + @Override + public void init(AnyValue config) { + System.out.println("执行 TestMessageConsumer.init"); + } + + @Override + public void onMessage(MessageConext context, TestBean message) { + System.out.println("消费消息, message: " + message); + } + + @Override + public void destroy(AnyValue config) { + System.out.println("执行 TestMessageConsumer.destroy"); + } +} +``` + +  通过Service里标记```@Messaged```的方法实现消费, 方法只能是```protected```或```public```, 不能是```final```、```static```。 +```java +public class TestMessageService extends AbstractService { + + @Messaged(mq = "mymq", topics = "test_bean_topic") + protected void runMessage(TestBean message) { + System.out.println("消费消息, message: " + message); + } +} +``` + +  通过```@Component```的Service里标记```@Messaged```的方法实现消费, 方法只能是```public```。 +```java +@Component +public final class TestMessageService extends AbstractService { + + @Messaged(mq = "mymq", topics = "test_bean_topic") + public int runMessage(TestBean message) { + System.out.println("消费消息, message: " + message); + return 0; + } +} +``` + +## 生产消息 +  通过```@Component```的Service里标记```@Messaged```的方法实现消费, 方法只能是```public```。 +```java +@Component +public class TestMessageService extends AbstractService { + + @ResourceProducer(mq = "mymq") + private MessageProducer producer; + + public void sendMessage() { + TestBean bean = new TestBean(12345, "this is a message"); + System.out.println("生产消息: " + bean); + producer.sendMessage("test_bean_topic", bean); + } +} +``` + +## Topic管理 +  通过```MessageManager```操作topic。 +```java +public class TestMessageManager extends AbstractService { + + @Resource(name = "mymq") + private MessageManager manager; + + // 创建topic + public void initTopic() { + manager.createTopic("topic_1", "topic_2").join(); + } + + // 创建topic + public void deleteTopic() { + manager.deleteTopic("topic_1", "topic_2").join(); + } + + // 查询topic + public void printTopic() { + List topics = manager.queryTopic().join(); + } +} +``` diff --git a/src/main/java/org/redkale/mq/MessageManager.java b/src/main/java/org/redkale/mq/MessageManager.java index 6485e063a..9b46e7b50 100644 --- a/src/main/java/org/redkale/mq/MessageManager.java +++ b/src/main/java/org/redkale/mq/MessageManager.java @@ -38,5 +38,5 @@ public interface MessageManager extends Resourcable { * * @return topic集合 */ - public abstract CompletableFuture> queryTopic(); + public CompletableFuture> queryTopic(); } diff --git a/src/test/java/org/redkale/test/mq/TestMessageManager.java b/src/test/java/org/redkale/test/mq/TestMessageManager.java new file mode 100644 index 000000000..cfaec07d3 --- /dev/null +++ b/src/test/java/org/redkale/test/mq/TestMessageManager.java @@ -0,0 +1,35 @@ +/* + +*/ + +package org.redkale.test.mq; + +import java.util.List; +import org.redkale.annotation.Resource; +import org.redkale.mq.MessageManager; +import org.redkale.service.AbstractService; + +/** + * + * @author zhangjx + */ +public class TestMessageManager extends AbstractService { + + @Resource(name = "mymq") + private MessageManager manager; + + // 创建topic + public void initTopic() { + manager.createTopic("topic_1", "topic_2").join(); + } + + // 创建topic + public void deleteTopic() { + manager.deleteTopic("topic_1", "topic_2").join(); + } + + // 查询topic + public void printTopic() { + List topics = manager.queryTopic().join(); + } +}