文档完善

This commit is contained in:
redkale
2024-08-13 19:09:27 +08:00
parent df8fd31e75
commit 3158f9265f
3 changed files with 133 additions and 2 deletions

View File

@@ -1,2 +1,98 @@
# 消息中心
文档完善中……
   MessageAgent是消息中心抽象接口。
## 配置
```xml
<mq name="mymq" type="kafka">
<servers value="127.0.0.1:9092"/>
<consumer autoload="true"/>
</mq>
```
## 消费消息
&emsp;&emsp; 通过```@ResourceConsumer```和```MessageConsumer```接口实现消费
```java
@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {
@Override
public void init(AnyValue config) {
System.out.println("执行 TestMessageConsumer.init");
}
@Override
public void onMessage(MessageConext context, TestBean message) {
System.out.println("消费消息, message: " + message);
}
@Override
public void destroy(AnyValue config) {
System.out.println("执行 TestMessageConsumer.destroy");
}
}
```
&emsp;&emsp;通过Service里标记```@Messaged```的方法实现消费, 方法只能是```protected```或```public``` 不能是```final```、```static```。
```java
public class TestMessageService extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic")
protected void runMessage(TestBean message) {
System.out.println("消费消息, message: " + message);
}
}
```
&emsp;&emsp;通过```@Component```的Service里标记```@Messaged```的方法实现消费, 方法只能是```public```。
```java
@Component
public final class TestMessageService extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic")
public int runMessage(TestBean message) {
System.out.println("消费消息, message: " + message);
return 0;
}
}
```
## 生产消息
&emsp;&emsp;通过```@Component```的Service里标记```@Messaged```的方法实现消费, 方法只能是```public```。
```java
@Component
public class TestMessageService extends AbstractService {
@ResourceProducer(mq = "mymq")
private MessageProducer producer;
public void sendMessage() {
TestBean bean = new TestBean(12345, "this is a message");
System.out.println("生产消息: " + bean);
producer.sendMessage("test_bean_topic", bean);
}
}
```
## Topic管理
&emsp;&emsp;通过```MessageManager```操作topic。
```java
public class TestMessageManager extends AbstractService {
@Resource(name = "mymq")
private MessageManager manager;
// 创建topic
public void initTopic() {
manager.createTopic("topic_1", "topic_2").join();
}
// 创建topic
public void deleteTopic() {
manager.deleteTopic("topic_1", "topic_2").join();
}
// 查询topic
public void printTopic() {
List<String> topics = manager.queryTopic().join();
}
}
```

View File

@@ -38,5 +38,5 @@ public interface MessageManager extends Resourcable {
*
* @return topic集合
*/
public abstract CompletableFuture<List<String>> queryTopic();
public CompletableFuture<List<String>> queryTopic();
}

View File

@@ -0,0 +1,35 @@
/*
*/
package org.redkale.test.mq;
import java.util.List;
import org.redkale.annotation.Resource;
import org.redkale.mq.MessageManager;
import org.redkale.service.AbstractService;
/**
*
* @author zhangjx
*/
public class TestMessageManager extends AbstractService {
@Resource(name = "mymq")
private MessageManager manager;
// 创建topic
public void initTopic() {
manager.createTopic("topic_1", "topic_2").join();
}
// 创建topic
public void deleteTopic() {
manager.deleteTopic("topic_1", "topic_2").join();
}
// 查询topic
public void printTopic() {
List<String> topics = manager.queryTopic().join();
}
}