Files
redkale/docs/agent-message.md
2024-08-13 19:57:06 +08:00

2.7 KiB
Raw Blame History

消息队列

MessageAgent是消息中心抽象接口。

配置

<mq name="mymq" type="kafka">
    <servers value="127.0.0.1:9092"/>
    <consumer autoload="true"/>
</mq>

消费消息

通过@ResourceConsumerMessageConsumer接口实现消费

@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {

    @Override
    public void init(AnyValue config) {
        System.out.println("执行 TestMessageConsumer.init");
    }

    @Override
    public void onMessage(MessageConext context, TestBean message) {
        System.out.println("消费消息, message: " + message);
    }

    @Override
    public void destroy(AnyValue config) {
        System.out.println("执行 TestMessageConsumer.destroy");
    }
}

通过Service里标记@Messaged的方法实现消费, 方法只能是protectedpublic 不能是finalstatic

public class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    protected void runMessage(TestBean message) {
        System.out.println("消费消息,  message: " + message);
    }
}

  通过@Component的Service里标记@Messaged的方法实现消费, 方法只能是public

@Component
public final class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    public int runMessage(TestBean message) {
        System.out.println("消费消息,  message: " + message);
        return 0;
    }
}

生产消息

  通过@Component的Service里标记@Messaged的方法实现消费, 方法只能是public

@Component
public class TestMessageService extends AbstractService {

    @ResourceProducer(mq = "mymq")
    private MessageProducer producer;

    public void sendMessage() {
        TestBean bean = new TestBean(12345, "this is a message");
        System.out.println("生产消息: " + bean);
        producer.sendMessage("test_bean_topic", bean);
    }
}

Topic管理

  通过MessageManager操作topic。

public class TestMessageManager extends AbstractService {

    @Resource(name = "mymq")
    private MessageManager manager;

    // 创建topic
    public void initTopic() {
        manager.createTopic("topic_1", "topic_2").join();
    }

    // 删除topic
    public void deleteTopic() {
        manager.deleteTopic("topic_1", "topic_2").join();
    }

    // 查询topic
    public void printTopic() {
        List<String> topics = manager.queryTopic().join();
    }
}