215 lines
5.4 KiB
Markdown
215 lines
5.4 KiB
Markdown
---
|
||
sidebar_position: 2
|
||
title: 发布订阅
|
||
description: ZHub 消息发布订阅功能详解,包含基础使用、多主题订阅、类型化消息和最佳实践
|
||
---
|
||
|
||
# 发布订阅
|
||
|
||
## 基础概念
|
||
|
||
- **发布者**:发送消息到指定主题
|
||
- **订阅者**:订阅主题并处理消息
|
||
- **主题**:消息分类标识,支持正则表达式匹配
|
||
- **消费组(GroupID)**:协同消费机制,同组内只有一个消费者处理消息
|
||
|
||
## 重要说明
|
||
|
||
**发布订阅**:使用具体主题名称
|
||
```java
|
||
zhub.subscribe("user-login", message -> { ... });
|
||
zhub.publish("user-login", "用户登录");
|
||
```
|
||
|
||
**消费组机制**:同组内消息只被一个消费者处理
|
||
```java
|
||
// 消费者A和B属于同一组,消息只会被其中一个处理
|
||
ZHubClient consumerA = new ZHubClient("127.0.0.1:1216", "order-group", "app-a", "token");
|
||
ZHubClient consumerB = new ZHubClient("127.0.0.1:1216", "order-group", "app-b", "token");
|
||
|
||
// 消费者C属于不同组,会收到所有消息
|
||
ZHubClient consumerC = new ZHubClient("127.0.0.1:1216", "notification-group", "app-c", "token");
|
||
```
|
||
|
||
**权限配置**:支持正则表达式
|
||
```yaml
|
||
users:
|
||
- id: 1
|
||
username: "user-service"
|
||
reads: ["user.*"] # 匹配所有 user.* 主题
|
||
writes: ["user.*"]
|
||
```
|
||
|
||
---
|
||
|
||
## 协同消费
|
||
|
||
### 消费组机制
|
||
|
||
**工作原理**:同组内只有一个消费者处理消息,不同组独立消费
|
||
|
||
```java
|
||
// 负载均衡:同组内竞争消费
|
||
ZHubClient service1 = new ZHubClient("127.0.0.1:1216", "order-group", "app-1", "token");
|
||
ZHubClient service2 = new ZHubClient("127.0.0.1:1216", "order-group", "app-2", "token");
|
||
// 只有其中一个处理消息
|
||
|
||
// 消息广播:不同组都收到消息
|
||
ZHubClient notify1 = new ZHubClient("127.0.0.1:1216", "notify-group-1", "notify-1", "token");
|
||
ZHubClient notify2 = new ZHubClient("127.0.0.1:1216", "notify-group-2", "notify-2", "token");
|
||
// 两个都会收到消息
|
||
```
|
||
|
||
**应用场景**:
|
||
- **负载均衡**:同组内多个服务竞争处理
|
||
- **消息广播**:不同组都接收消息
|
||
- **数据去重**:同组内确保只处理一次
|
||
|
||
---
|
||
|
||
## 基础使用
|
||
|
||
### 1. 基础发布订阅
|
||
|
||
**字符串消息**:
|
||
```java
|
||
// 订阅
|
||
zhub.subscribe("user-login", message -> {
|
||
System.out.println("用户登录: " + message);
|
||
});
|
||
|
||
// 发布
|
||
zhub.publish("user-login", "用户ID: 12345");
|
||
```
|
||
|
||
**类型化消息**:
|
||
```java
|
||
// 定义消息类型
|
||
public class UserLoginEvent {
|
||
private String userId;
|
||
private String username;
|
||
private long loginTime;
|
||
// getter/setter...
|
||
}
|
||
|
||
// 订阅类型化消息(使用TypeToken)
|
||
zhub.subscribe("user-login", new TypeToken<UserLoginEvent>(){}, event -> {
|
||
System.out.println("用户: " + event.getUsername());
|
||
});
|
||
|
||
// 发布类型化消息
|
||
UserLoginEvent event = new UserLoginEvent("12345", "张三", System.currentTimeMillis());
|
||
zhub.publish("user-login", event);
|
||
```
|
||
|
||
**基础类型**:
|
||
```java
|
||
// 整数消息(使用IType)
|
||
zhub.subscribe("user-count", IType.INT, count -> {
|
||
System.out.println("用户数: " + count);
|
||
});
|
||
zhub.publish("user-count", 100);
|
||
|
||
// Map 消息(使用IType)
|
||
zhub.subscribe("user-info", IType.MAP, info -> {
|
||
System.out.println("用户信息: " + info);
|
||
});
|
||
Map<String, String> userInfo = Map.of("userId", "12345", "username", "张三");
|
||
zhub.publish("user-info", userInfo);
|
||
```
|
||
|
||
### 2. 多主题订阅
|
||
|
||
**分别订阅**:
|
||
```java
|
||
zhub.subscribe("user-profile", message -> {
|
||
System.out.println("用户资料: " + message);
|
||
});
|
||
zhub.subscribe("user-login", message -> {
|
||
System.out.println("用户登录: " + message);
|
||
});
|
||
```
|
||
|
||
**逗号分隔订阅**:
|
||
```java
|
||
// 同时订阅多个主题
|
||
zhub.subscribe("user-profile,user-login,user-logout", message -> {
|
||
System.out.println("用户消息: " + message);
|
||
});
|
||
|
||
// 类型化消息(使用TypeToken)
|
||
zhub.subscribe("order-create,order-payment", new TypeToken<OrderEvent>(){}, event -> {
|
||
System.out.println("订单事件: " + event.getOrderId());
|
||
});
|
||
```
|
||
|
||
### 3. 广播消息
|
||
|
||
```java
|
||
// 广播给所有客户端
|
||
zhub.broadcast("topic-abc", "hello!");
|
||
```
|
||
|
||
---
|
||
|
||
## 高级功能
|
||
|
||
### 1. 延时消息
|
||
```java
|
||
// 延时5分钟后发送消息
|
||
zhub.delay("reminder-email", "发送提醒邮件", 5 * 60 * 1000);
|
||
```
|
||
|
||
### 2. 消息过滤
|
||
```java
|
||
zhub.subscribe("user-notification", message -> {
|
||
if (message.contains("VIP")) {
|
||
System.out.println("VIP用户消息: " + message);
|
||
}
|
||
});
|
||
```
|
||
|
||
### 3. 批量处理
|
||
```java
|
||
private final List<String> messageBatch = new ArrayList<>();
|
||
|
||
zhub.subscribe("data-sync", message -> {
|
||
synchronized (messageBatch) {
|
||
messageBatch.add(message);
|
||
if (messageBatch.size() >= 100) {
|
||
processBatch(new ArrayList<>(messageBatch));
|
||
messageBatch.clear();
|
||
}
|
||
}
|
||
});
|
||
```
|
||
|
||
---
|
||
|
||
## 最佳实践
|
||
|
||
**Topic 命名**:`user-profile-update`、`order-payment-success`
|
||
|
||
**异常处理**:
|
||
```java
|
||
zhub.subscribe("critical-task", message -> {
|
||
try {
|
||
processTask(message);
|
||
} catch (Exception e) {
|
||
logger.error("处理失败", e);
|
||
}
|
||
});
|
||
```
|
||
|
||
## 消息特性
|
||
|
||
- **顺序保证**:单个主题内消息严格按发送顺序处理
|
||
- **非持久化**:默认不持久化,重启后消息丢失
|
||
- **高性能**:内存处理,支持高并发消息传递
|
||
- **容量限制**:服务端通道容量500条,满时消息会被丢弃
|
||
|
||
## 注意事项
|
||
|
||
- 避免创建大量对象
|
||
- 注意消息积压:服务端通道容量500条,满时消息会被丢弃
|