Files
z-docs/docs/tutorial-basics/pub-sub.md

215 lines
5.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
sidebar_position: 2
title: 发布订阅
description: ZHub 消息发布订阅功能详解,包含基础使用、多主题订阅、类型化消息和最佳实践
---
# 发布订阅
## 基础概念
- **发布者**:发送消息到指定主题
- **订阅者**:订阅主题并处理消息
- **主题**:消息分类标识,支持正则表达式匹配
- **消费组GroupID**:协同消费机制,同组内只有一个消费者处理消息
## 重要说明
**发布订阅**:使用具体主题名称
```java
zhub.subscribe("user-login", message -> { ... });
zhub.publish("user-login", "用户登录");
```
**消费组机制**:同组内消息只被一个消费者处理
```java
// 消费者A和B属于同一组消息只会被其中一个处理
ZHubClient consumerA = new ZHubClient("127.0.0.1:1216", "order-group", "app-a", "token");
ZHubClient consumerB = new ZHubClient("127.0.0.1:1216", "order-group", "app-b", "token");
// 消费者C属于不同组会收到所有消息
ZHubClient consumerC = new ZHubClient("127.0.0.1:1216", "notification-group", "app-c", "token");
```
**权限配置**:支持正则表达式
```yaml
users:
- id: 1
username: "user-service"
reads: ["user.*"] # 匹配所有 user.* 主题
writes: ["user.*"]
```
---
## 协同消费
### 消费组机制
**工作原理**:同组内只有一个消费者处理消息,不同组独立消费
```java
// 负载均衡:同组内竞争消费
ZHubClient service1 = new ZHubClient("127.0.0.1:1216", "order-group", "app-1", "token");
ZHubClient service2 = new ZHubClient("127.0.0.1:1216", "order-group", "app-2", "token");
// 只有其中一个处理消息
// 消息广播:不同组都收到消息
ZHubClient notify1 = new ZHubClient("127.0.0.1:1216", "notify-group-1", "notify-1", "token");
ZHubClient notify2 = new ZHubClient("127.0.0.1:1216", "notify-group-2", "notify-2", "token");
// 两个都会收到消息
```
**应用场景**
- **负载均衡**:同组内多个服务竞争处理
- **消息广播**:不同组都接收消息
- **数据去重**:同组内确保只处理一次
---
## 基础使用
### 1. 基础发布订阅
**字符串消息**
```java
// 订阅
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});
// 发布
zhub.publish("user-login", "用户ID: 12345");
```
**类型化消息**
```java
// 定义消息类型
public class UserLoginEvent {
private String userId;
private String username;
private long loginTime;
// getter/setter...
}
// 订阅类型化消息使用TypeToken
zhub.subscribe("user-login", new TypeToken<UserLoginEvent>(){}, event -> {
System.out.println("用户: " + event.getUsername());
});
// 发布类型化消息
UserLoginEvent event = new UserLoginEvent("12345", "张三", System.currentTimeMillis());
zhub.publish("user-login", event);
```
**基础类型**
```java
// 整数消息使用IType
zhub.subscribe("user-count", IType.INT, count -> {
System.out.println("用户数: " + count);
});
zhub.publish("user-count", 100);
// Map 消息使用IType
zhub.subscribe("user-info", IType.MAP, info -> {
System.out.println("用户信息: " + info);
});
Map<String, String> userInfo = Map.of("userId", "12345", "username", "张三");
zhub.publish("user-info", userInfo);
```
### 2. 多主题订阅
**分别订阅**
```java
zhub.subscribe("user-profile", message -> {
System.out.println("用户资料: " + message);
});
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});
```
**逗号分隔订阅**
```java
// 同时订阅多个主题
zhub.subscribe("user-profile,user-login,user-logout", message -> {
System.out.println("用户消息: " + message);
});
// 类型化消息使用TypeToken
zhub.subscribe("order-create,order-payment", new TypeToken<OrderEvent>(){}, event -> {
System.out.println("订单事件: " + event.getOrderId());
});
```
### 3. 广播消息
```java
// 广播给所有客户端
zhub.broadcast("topic-abc", "hello!");
```
---
## 高级功能
### 1. 延时消息
```java
// 延时5分钟后发送消息
zhub.delay("reminder-email", "发送提醒邮件", 5 * 60 * 1000);
```
### 2. 消息过滤
```java
zhub.subscribe("user-notification", message -> {
if (message.contains("VIP")) {
System.out.println("VIP用户消息: " + message);
}
});
```
### 3. 批量处理
```java
private final List<String> messageBatch = new ArrayList<>();
zhub.subscribe("data-sync", message -> {
synchronized (messageBatch) {
messageBatch.add(message);
if (messageBatch.size() >= 100) {
processBatch(new ArrayList<>(messageBatch));
messageBatch.clear();
}
}
});
```
---
## 最佳实践
**Topic 命名**`user-profile-update``order-payment-success`
**异常处理**
```java
zhub.subscribe("critical-task", message -> {
try {
processTask(message);
} catch (Exception e) {
logger.error("处理失败", e);
}
});
```
## 消息特性
- **顺序保证**:单个主题内消息严格按发送顺序处理
- **非持久化**:默认不持久化,重启后消息丢失
- **高性能**:内存处理,支持高并发消息传递
- **容量限制**服务端通道容量500条满时消息会被丢弃
## 注意事项
- 避免创建大量对象
- 注意消息积压服务端通道容量500条满时消息会被丢弃