Compare commits
13 Commits
feature/au
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| 44d4fcdbc4 | |||
| 978a268f93 | |||
| 1fae74191e | |||
| d27d9d6bc3 | |||
| 04b3113b8a | |||
| d471a3f508 | |||
| 9d85efce17 | |||
| 100b478d74 | |||
| 9fd8ffa8e9 | |||
| e27f01cb34 | |||
| b80dc78cce | |||
| 4fc7121b28 | |||
| f7360c45d8 |
33
Dockerfile
33
Dockerfile
@@ -1,38 +1,11 @@
|
|||||||
# 阶段一:构建阶段
|
# 构建运行阶段
|
||||||
FROM golang:alpine AS builder
|
|
||||||
|
|
||||||
WORKDIR /opt/zhub
|
|
||||||
|
|
||||||
# 复制 go.mod 文件
|
|
||||||
COPY go.mod .
|
|
||||||
|
|
||||||
# 下载并缓存依赖包
|
|
||||||
RUN go mod download
|
|
||||||
|
|
||||||
# 将源代码复制到容器中
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
# 运行 go get 命令以确保所有需要的包都被获取和更新
|
|
||||||
#RUN go get zhub/internal/monitor
|
|
||||||
#RUN go get zhub/cmd
|
|
||||||
#RUN go get zhub/internal/zsub
|
|
||||||
#RUN go get zhub/internal/config
|
|
||||||
|
|
||||||
# 构建可执行文件
|
|
||||||
RUN go build -o zhub.sh -ldflags "-s -w"
|
|
||||||
|
|
||||||
|
|
||||||
# 阶段二:运行阶段
|
|
||||||
FROM alpine:latest
|
FROM alpine:latest
|
||||||
|
|
||||||
# 设置工作目录
|
# 设置工作目录
|
||||||
WORKDIR /opt/zhub
|
WORKDIR /opt/zhub
|
||||||
|
|
||||||
# 从构建阶段复制可执行文件到当前阶段
|
# 从构建阶段复制可执行文件和配置到当前阶段
|
||||||
COPY --from=builder /opt/zhub/zhub.sh .
|
COPY zhub.sh .
|
||||||
COPY --from=builder /opt/zhub/app.ini .
|
|
||||||
|
|
||||||
# 复制 app.ini 配置文件到容器中
|
|
||||||
COPY app.ini .
|
COPY app.ini .
|
||||||
|
|
||||||
EXPOSE 711 1216
|
EXPOSE 711 1216
|
||||||
|
|||||||
203
README.md
Normal file
203
README.md
Normal file
@@ -0,0 +1,203 @@
|
|||||||
|
# ZHub 工程项目文档
|
||||||
|
|
||||||
|
## 1. 概述
|
||||||
|
ZHub 是一个基于 Go 的高性能分布式消息中间件,支持发布/订阅模式、定时任务、延时消息、分布式锁等功能。它适用于需要高并发、低延迟的消息传递场景。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. 技术栈
|
||||||
|
- **语言**: Go (Golang)
|
||||||
|
- **网络库**: `net`, `bufio`
|
||||||
|
- **并发模型**: 使用 goroutines 和 channels 实现并发处理。
|
||||||
|
- **持久化**: 使用文件存储(如延时任务和锁信息)。
|
||||||
|
- **配置管理**: 使用 Viper 读取 INI 格式的配置文件。
|
||||||
|
- **权限控制**: 基于 YAML 的用户、组、Token 和频道权限管理。
|
||||||
|
- **监控界面**: 使用 Gin 提供 HTTP 接口用于监控和服务管理。
|
||||||
|
- **数据库支持**: 支持 MySQL 和 PostgreSQL 用于定时任务的配置。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. 核心功能模块
|
||||||
|
|
||||||
|
### 3.1 消息总线 (`zbus`)
|
||||||
|
- **主题管理**: 支持多主题订阅与发布。
|
||||||
|
- **组订阅**: 每个主题可以有多个消费组,组内客户端共享消息。
|
||||||
|
- **延时消息**: 支持延时发送消息,使用 `time.Timer` 实现。
|
||||||
|
- **定时任务**: 支持 cron 表达式或固定时间间隔的任务调度。
|
||||||
|
- **分布式锁**: 提供互斥锁机制,支持重入和自动释放。
|
||||||
|
- **持久化**: 使用文件保存延时任务和锁的状态,确保服务重启后数据不丢失。
|
||||||
|
|
||||||
|
### 3.2 权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37))
|
||||||
|
- **用户认证**: 支持基于 Token 的认证。
|
||||||
|
- **角色授权**: 用户分属不同组,组内定义读写权限。
|
||||||
|
- **频道访问控制**: 频道可设置为公开或私有,私有频道需授权访问。
|
||||||
|
- **配置热加载**: 支持运行时重新加载权限配置。
|
||||||
|
|
||||||
|
### 3.3 客户端 (`client`)
|
||||||
|
- **连接管理**: 支持自动重连机制。
|
||||||
|
- **订阅与发布**: 提供 API 订阅主题并接收消息。
|
||||||
|
- **RPC 调用**: 支持远程过程调用,并处理超时和返回结果。
|
||||||
|
- **锁机制**: 支持获取和释放分布式锁。
|
||||||
|
|
||||||
|
### 3.4 监控 (`monitor`)
|
||||||
|
- **HTTP 接口**: 提供 `/_/info`, `/_/cleanup`, `/timer/reload` 等接口。
|
||||||
|
- **可视化界面**: 提供简单的 HTML 页面展示系统状态。
|
||||||
|
- **版本信息**: 显示当前运行的 ZHub 版本。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. 架构设计
|
||||||
|
|
||||||
|
### 4.1 总体架构
|
||||||
|
ZHub 采用经典的 C/S 架构,包含以下主要组件:
|
||||||
|
- **服务端 ([main.go](file://D:\wk-go\zhub\main.go))**
|
||||||
|
- 启动 TCP 服务器监听客户端连接。
|
||||||
|
- 加载配置文件并初始化日志。
|
||||||
|
- 启动监控服务(Gin)。
|
||||||
|
- **客户端 ([client.go](file://D:\wk-go\zhub\cmd\client.go))**
|
||||||
|
- 提供连接、订阅、发布、RPC 调用等 API。
|
||||||
|
- 自动重连机制保证连接稳定性。
|
||||||
|
- **消息总线 (`zbus`)**
|
||||||
|
- 处理消息的发布、订阅、延时、定时等核心逻辑。
|
||||||
|
- **权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37))**
|
||||||
|
- 管理用户、组、Token 和频道权限。
|
||||||
|
- **监控 (`monitor`)**
|
||||||
|
- 提供 HTTP 接口和 Web 界面用于管理和监控。
|
||||||
|
|
||||||
|
### 4.2 数据流图
|
||||||
|
```plaintext
|
||||||
|
+-------------------+ +------------------+ +------------------+
|
||||||
|
| Client (Go) |<--->| ZHub Server |<--->| Monitor (Gin) |
|
||||||
|
+-------------------+ +------------------+ +------------------+
|
||||||
|
|
|
||||||
|
v
|
||||||
|
+------------------+
|
||||||
|
| Persistence |
|
||||||
|
| (File, DB) |
|
||||||
|
+------------------+
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. 配置说明
|
||||||
|
|
||||||
|
### 5.1 配置文件 ([app.ini](file://D:\wk-go\zhub\app.ini))
|
||||||
|
- **[service]**: 服务相关配置。
|
||||||
|
- `watch`: 监控服务地址。
|
||||||
|
- [addr](file://D:\wk-go\zhub\cmd\client.go#L25-L25): 主服务监听地址。
|
||||||
|
- [auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37): 是否启用认证(0 不启用,1 启用)。
|
||||||
|
- **[data]**: 数据目录配置。
|
||||||
|
- **[log]**: 日志配置。
|
||||||
|
- `handlers`: 输出方式(console/file)。
|
||||||
|
- `level`: 日志级别(info/debug/error)。
|
||||||
|
- [file](file://D:\wk-go\zhub\Dockerfile): 日志文件路径。
|
||||||
|
- **[ztimer]**: 定时任务数据库配置。
|
||||||
|
- `db.addr`, `db.user`, `db.password`, `db.database`, `db.schema`, `db.type`.
|
||||||
|
|
||||||
|
### 5.2 权限配置 ([auth.yml](file://D:\wk-go\zhub\auth.yml))
|
||||||
|
- **users**: 用户列表,包含 ID、用户名、密码、状态、所属组、读写权限。
|
||||||
|
- **groups**: 用户组列表,包含名称、描述、读写权限。
|
||||||
|
- **tokens**: Token 列表,包含 ID、用户 ID、Token 值、过期时间和状态。
|
||||||
|
- **channels**: 频道列表,包含名称、描述、是否为公开频道。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 6. 部署与构建
|
||||||
|
|
||||||
|
### 6.1 构建脚本 ([build.bat](file://D:\wk-go\zhub\build.bat))
|
||||||
|
- 支持跨平台编译(Linux、Windows、Mac)。
|
||||||
|
- 使用 UPX 压缩生成的二进制文件。
|
||||||
|
- 自动生成版本号并嵌入到程序中。
|
||||||
|
|
||||||
|
### 6.2 启动命令
|
||||||
|
- **服务端启动**:
|
||||||
|
```bash
|
||||||
|
go run main.go
|
||||||
|
```
|
||||||
|
|
||||||
|
- **客户端启动**:
|
||||||
|
```bash
|
||||||
|
go run cmd/client.go -cli -r <command>
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### 6.3 监控服务
|
||||||
|
- 默认监听地址:`http://<host>:<port>/`
|
||||||
|
- 支持的接口:
|
||||||
|
- `/_/info`: 获取系统信息。
|
||||||
|
- `/_/cleanup`: 清理无订阅的主题。
|
||||||
|
- `/timer/reload`: 重新加载定时任务。
|
||||||
|
- `/topic/publish`: 发布主题消息。
|
||||||
|
- `/topic/delay`: 发送延时消息。
|
||||||
|
- `/auth/reload`: 重新加载权限配置。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 7. 安全性设计
|
||||||
|
|
||||||
|
### 7.1 认证机制
|
||||||
|
- 所有客户端必须通过 Token 认证才能进行操作。
|
||||||
|
- Token 具有过期时间和状态管理。
|
||||||
|
|
||||||
|
### 7.2 授权机制
|
||||||
|
- 用户分属不同组,组内定义读写权限。
|
||||||
|
- 频道可设置为公开或私有,私有频道需授权访问。
|
||||||
|
- 支持正则表达式匹配主题名,实现细粒度的权限控制。
|
||||||
|
|
||||||
|
### 7.3 分布式锁
|
||||||
|
- 使用互斥锁机制防止多个客户端同时执行关键操作。
|
||||||
|
- 锁具有超时机制,避免死锁。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 8. 性能优化
|
||||||
|
|
||||||
|
### 8.1 并发处理
|
||||||
|
- 使用 goroutines 和 channels 实现高效的并发处理。
|
||||||
|
- 所有 I/O 操作异步化,减少阻塞。
|
||||||
|
|
||||||
|
### 8.2 内存管理
|
||||||
|
- 使用缓冲区池化技术减少内存分配。
|
||||||
|
- 对频繁使用的结构体进行复用。
|
||||||
|
|
||||||
|
### 8.3 日志优化
|
||||||
|
- 支持日志级别控制,减少不必要的日志输出。
|
||||||
|
- 日志输出格式化,便于调试和分析。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 9. 测试与验证
|
||||||
|
|
||||||
|
### 9.1 单元测试
|
||||||
|
- 覆盖所有核心模块的功能测试。
|
||||||
|
- 使用 Go 的 testing 包编写测试用例。
|
||||||
|
|
||||||
|
### 9.2 集成测试
|
||||||
|
- 模拟多个客户端并发操作,验证系统的稳定性和一致性。
|
||||||
|
- 测试定时任务、延时消息、锁机制等复杂场景。
|
||||||
|
|
||||||
|
### 9.3 压力测试
|
||||||
|
- 使用基准测试工具评估系统的吞吐量和响应时间。
|
||||||
|
- 优化瓶颈部分,提升整体性能。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 10. 未来规划
|
||||||
|
|
||||||
|
### 10.1 功能增强
|
||||||
|
- 支持更多类型的数据库(如 MongoDB、Redis)。
|
||||||
|
- 增加消息持久化支持(如 Kafka、RabbitMQ)。
|
||||||
|
- 提供更丰富的监控指标和告警机制。
|
||||||
|
|
||||||
|
### 10.2 性能优化
|
||||||
|
- 引入零拷贝技术减少内存复制。
|
||||||
|
- 进一步优化 goroutine 调度策略。
|
||||||
|
|
||||||
|
### 10.3 安全加固
|
||||||
|
- 增加 TLS 加密通信支持。
|
||||||
|
- 支持更复杂的权限策略(如 RBAC、ABAC)。
|
||||||
|
|
||||||
|
### 10.4 用户体验
|
||||||
|
- 开发图形化管理界面,简化配置和监控操作。
|
||||||
|
- 提供 SDK 支持多种编程语言(如 Python、Java)。
|
||||||
31
app.ini
31
app.ini
@@ -1,18 +1,21 @@
|
|||||||
[log]
|
# app.ini
|
||||||
handlers=console #console|file
|
|
||||||
level=debug # info|debug|error
|
|
||||||
file=zhub.log
|
|
||||||
|
|
||||||
[service]
|
[service]
|
||||||
watch=0.0.0.0:711
|
watch=0.0.0.0:711 # 服务管理端口
|
||||||
addr=0.0.0.0:1216
|
addr=0.0.0.0:1216 # 服务端口
|
||||||
auth=1
|
auth=0 # 是否开启连接授权 0不开启、1开启
|
||||||
|
|
||||||
[data]
|
[data]
|
||||||
dir=./data
|
dir=./data # 数据目录
|
||||||
|
|
||||||
[ztimer]
|
[log]
|
||||||
db.addr=127.0.0.1:3306
|
handlers=console # console|file
|
||||||
db.user=root
|
level=debug # info|debug|error
|
||||||
db.password=123456
|
file=zhub.log
|
||||||
db.database=zhub
|
|
||||||
|
[ztimer] # ztimer 配置 (可选,如果不使用定时调度则可不配置)
|
||||||
|
# db.addr=127.0.0.1:3306 # timer 使用的MySql数据库配置
|
||||||
|
# db.user=root
|
||||||
|
# db.password=123456
|
||||||
|
# db.database=zhub
|
||||||
|
# db.schema=public
|
||||||
|
# db.type=postgres # mysql|postgres
|
||||||
|
|||||||
13
auth.yml
13
auth.yml
@@ -36,8 +36,10 @@ groups:
|
|||||||
description: Group 1
|
description: Group 1
|
||||||
reads:
|
reads:
|
||||||
- ^zcore:* # "zcore:" 开头的订阅
|
- ^zcore:* # "zcore:" 开头的订阅
|
||||||
|
- rpc-t
|
||||||
writes:
|
writes:
|
||||||
- ^zcore:* # "zcore:" 开头的发送
|
- ^zcore:* # "zcore:" 开头的发送
|
||||||
|
- rpc-t
|
||||||
|
|
||||||
- name: zcore
|
- name: zcore
|
||||||
description: Group 2
|
description: Group 2
|
||||||
@@ -59,15 +61,16 @@ tokens:
|
|||||||
|
|
||||||
# 公开频道设置
|
# 公开频道设置
|
||||||
channels:
|
channels:
|
||||||
- name: "-"
|
|
||||||
description: "无效占位符"
|
|
||||||
public: true
|
|
||||||
|
|
||||||
- name: "lock"
|
- name: "lock"
|
||||||
description: "分布式锁通知频道"
|
description: "分布式锁通知频道"
|
||||||
public: true
|
public: true
|
||||||
|
- name: "trylock"
|
||||||
|
description: "分布式锁通知频道"
|
||||||
|
public: true
|
||||||
- name: "app_local"
|
- name: "app_local"
|
||||||
description: "本地appname"
|
description: "本地appname"
|
||||||
public: true
|
public: true
|
||||||
|
- name: "DEV-LOCAL"
|
||||||
|
description: "本地appname"
|
||||||
|
public: true
|
||||||
# ---------------------------------------------
|
# ---------------------------------------------
|
||||||
|
|||||||
118
build.bat
Normal file
118
build.bat
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
@echo off
|
||||||
|
chcp 65001 >nul
|
||||||
|
title zhub 构建脚本
|
||||||
|
|
||||||
|
rem 获取当前日期和时间并格式化为 YYYY.MM.DD-HH.MM.SS
|
||||||
|
for /f "tokens=2 delims==" %%i in ('wmic os get localdatetime /value') do set datetime=%%i
|
||||||
|
set year=%datetime:~0,4%
|
||||||
|
set month=%datetime:~4,2%
|
||||||
|
set day=%datetime:~6,2%
|
||||||
|
set hour=%datetime:~8,2%
|
||||||
|
set minute=%datetime:~10,2%
|
||||||
|
set second=%datetime:~12,2%
|
||||||
|
set version=%year%.%month%.%day%-%hour%.%minute%.%second%
|
||||||
|
|
||||||
|
rem 输出当前版本号
|
||||||
|
echo 当前构建版本号:%version%
|
||||||
|
|
||||||
|
rem 删除旧文件
|
||||||
|
echo 删除历史构建文件...
|
||||||
|
del /q zhub zhub.sh zhub.exe >nul 2>nul
|
||||||
|
|
||||||
|
rem 交互式平台选择
|
||||||
|
echo ==============================
|
||||||
|
echo 请选择要构建的平台:
|
||||||
|
echo ==============================
|
||||||
|
echo 1. Linux (amd64)
|
||||||
|
echo 2. Linux (arm64)
|
||||||
|
echo 3. Windows (amd64)
|
||||||
|
echo 4. MacOS (amd64)
|
||||||
|
echo 5. 全部构建
|
||||||
|
echo 0. 退出
|
||||||
|
echo ==============================
|
||||||
|
set /p choice=请输入选项编号(可多选,例如 135):
|
||||||
|
|
||||||
|
if "%choice%"=="0" goto end
|
||||||
|
|
||||||
|
if not "%choice%"=="" (
|
||||||
|
echo 开始构建版本 %version%...
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%choice%"=="1" (
|
||||||
|
call :build_linux
|
||||||
|
)
|
||||||
|
if "%choice%"=="2" (
|
||||||
|
call :build_linux_arm
|
||||||
|
)
|
||||||
|
if "%choice%"=="3" (
|
||||||
|
call :build_windows
|
||||||
|
)
|
||||||
|
if "%choice%"=="4" (
|
||||||
|
call :build_mac
|
||||||
|
)
|
||||||
|
if "%choice%"=="5" (
|
||||||
|
call :build_linux
|
||||||
|
call :build_linux_arm
|
||||||
|
call :build_windows
|
||||||
|
call :build_mac
|
||||||
|
)
|
||||||
|
|
||||||
|
goto end
|
||||||
|
|
||||||
|
rem ========= 构建平台函数 ==========
|
||||||
|
|
||||||
|
:build_linux
|
||||||
|
echo [Linux amd64] 开始构建...
|
||||||
|
set GOOS=linux
|
||||||
|
set GOARCH=amd64
|
||||||
|
go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
|
||||||
|
upx -9 zhub.sh
|
||||||
|
call :move_to_dist zhub.sh linux
|
||||||
|
goto :eof
|
||||||
|
|
||||||
|
:build_linux_arm
|
||||||
|
echo [Linux arm64] 开始构建...
|
||||||
|
set GOOS=linux
|
||||||
|
set GOARCH=arm64
|
||||||
|
go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
|
||||||
|
upx -9 zhub.sh
|
||||||
|
call :move_to_dist zhub.sh linux-arm
|
||||||
|
goto :eof
|
||||||
|
|
||||||
|
:build_windows
|
||||||
|
echo [Windows amd64] 开始构建...
|
||||||
|
set GOOS=windows
|
||||||
|
set GOARCH=amd64
|
||||||
|
go build -o zhub.exe -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
|
||||||
|
upx -9 zhub.exe
|
||||||
|
call :move_to_dist zhub.exe windows
|
||||||
|
goto :eof
|
||||||
|
|
||||||
|
:build_mac
|
||||||
|
echo [MacOS amd64] 开始构建...
|
||||||
|
set GOOS=darwin
|
||||||
|
set GOARCH=amd64
|
||||||
|
go build -o zhub -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
|
||||||
|
upx -9 zhub
|
||||||
|
call :move_to_dist zhub mac
|
||||||
|
goto :eof
|
||||||
|
|
||||||
|
rem ========= 移动构建文件 ==========
|
||||||
|
|
||||||
|
:move_to_dist
|
||||||
|
setlocal
|
||||||
|
set file=%1
|
||||||
|
set platform=%2
|
||||||
|
set target=dist\%version%\%platform%
|
||||||
|
if not exist %target% (
|
||||||
|
mkdir %target%
|
||||||
|
)
|
||||||
|
move /Y %file% %target%\ >nul
|
||||||
|
echo 构建产物已移动至:%target%\%file%
|
||||||
|
endlocal
|
||||||
|
goto :eof
|
||||||
|
|
||||||
|
:end
|
||||||
|
echo 构建流程结束。
|
||||||
|
pause
|
||||||
|
exit
|
||||||
293
cmd/client.go
293
cmd/client.go
@@ -3,22 +3,20 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"github.com/go-basic/uuid"
|
|
||||||
"io"
|
"io"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
//"github.com/go-basic/uuid"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-basic/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type ZHubClient struct {
|
||||||
wlock sync.Mutex // write lock
|
wlock sync.Mutex // write lock
|
||||||
rlock sync.Mutex // read lock
|
rlock sync.Mutex // read lock
|
||||||
|
|
||||||
@@ -46,37 +44,40 @@ type Lock struct {
|
|||||||
// duration int // lock duration
|
// duration int // lock duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func Create(appname, addr, groupid, auth string) (*Client, error) {
|
func (c *ZHubClient) Initx(appname, addr, groupid, auth string) error {
|
||||||
conn, err := net.Dial("tcp", addr)
|
c.appname = appname
|
||||||
if err != nil {
|
c.addr = addr
|
||||||
return &Client{}, err
|
c.groupid = groupid
|
||||||
}
|
c.auth = auth
|
||||||
|
return c.Init()
|
||||||
client := Client{
|
|
||||||
wlock: sync.Mutex{},
|
|
||||||
rlock: sync.Mutex{},
|
|
||||||
appname: appname,
|
|
||||||
addr: addr,
|
|
||||||
conn: conn,
|
|
||||||
groupid: groupid,
|
|
||||||
createTime: time.Now(),
|
|
||||||
|
|
||||||
subFun: make(map[string]func(v string)),
|
|
||||||
timerFun: make(map[string]func()),
|
|
||||||
chSend: make(chan []string, 100),
|
|
||||||
chReceive: make(chan []string, 100),
|
|
||||||
timerReceive: make(chan []string, 100),
|
|
||||||
lockFlag: make(map[string]*Lock),
|
|
||||||
auth: auth,
|
|
||||||
}
|
|
||||||
|
|
||||||
client.send("auth", auth)
|
|
||||||
client.send("groupid " + groupid)
|
|
||||||
client.init()
|
|
||||||
return &client, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) reconn() (err error) {
|
// Init 创建一个客户端
|
||||||
|
func (c *ZHubClient) Init( /*appname, addr, groupid, auth string*/ ) error {
|
||||||
|
conn, err := net.Dial("tcp", c.addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.conn = conn
|
||||||
|
c.wlock = sync.Mutex{}
|
||||||
|
c.rlock = sync.Mutex{}
|
||||||
|
c.createTime = time.Now()
|
||||||
|
|
||||||
|
c.subFun = make(map[string]func(v string))
|
||||||
|
c.timerFun = make(map[string]func())
|
||||||
|
c.chSend = make(chan []string, 100)
|
||||||
|
c.chReceive = make(chan []string, 100)
|
||||||
|
c.timerReceive = make(chan []string, 100)
|
||||||
|
c.lockFlag = make(map[string]*Lock)
|
||||||
|
|
||||||
|
c.send("auth", c.auth)
|
||||||
|
c.send("groupid " + c.groupid)
|
||||||
|
c.init()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ZHubClient) reconn() (err error) {
|
||||||
for n := 1; n < 10; n++ {
|
for n := 1; n < 10; n++ {
|
||||||
conn, err := net.Dial("tcp", c.addr)
|
conn, err := net.Dial("tcp", c.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -102,7 +103,7 @@ func (c *Client) reconn() (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) init() {
|
func (c *ZHubClient) init() {
|
||||||
// 消费 topic 消息
|
// 消费 topic 消息
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@@ -130,7 +131,7 @@ func (c *Client) init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe subscribe topic
|
// Subscribe subscribe topic
|
||||||
func (c *Client) Subscribe(topic string, fun func(v string)) {
|
func (c *ZHubClient) Subscribe(topic string, fun func(v string)) {
|
||||||
c.send("subscribe " + topic)
|
c.send("subscribe " + topic)
|
||||||
if fun != nil {
|
if fun != nil {
|
||||||
c.wlock.Lock()
|
c.wlock.Lock()
|
||||||
@@ -139,7 +140,7 @@ func (c *Client) Subscribe(topic string, fun func(v string)) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Unsubscribe(topic string) {
|
func (c *ZHubClient) Unsubscribe(topic string) {
|
||||||
c.send("unsubscribe " + topic)
|
c.send("unsubscribe " + topic)
|
||||||
delete(c.subFun, topic)
|
delete(c.subFun, topic)
|
||||||
}
|
}
|
||||||
@@ -149,30 +150,32 @@ func (c *Client) Unsubscribe(topic string) {
|
|||||||
ping
|
ping
|
||||||
---
|
---
|
||||||
*/
|
*/
|
||||||
func (c *Client) ping() {
|
func (c *ZHubClient) ping() {
|
||||||
c.send("ping")
|
c.send("ping")
|
||||||
}
|
}
|
||||||
|
|
||||||
//Publish -------------------------------------- pub-sub --------------------------------------
|
// Publish -------------------------------------- pub-sub --------------------------------------
|
||||||
func (c *Client) Publish(topic string, message string) error {
|
func (c *ZHubClient) Publish(topic string, message string) error {
|
||||||
return c.send("publish", topic, message)
|
return c.send("publish", topic, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Broadcast(topic string, message string) error {
|
func (c *ZHubClient) Broadcast(topic string, message string) error {
|
||||||
return c.send("broadcast", topic, message)
|
return c.send("broadcast", topic, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Delay(topic string, message string, delay int) error {
|
func (c *ZHubClient) Delay(topic string, message string, delay int) error {
|
||||||
return c.send("delay", topic, message, strconv.Itoa(delay))
|
return c.send("delay", topic, message, strconv.Itoa(delay))
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Timer
|
Timer
|
||||||
func (c *Client) Timer(topic string, expr string, fun func()) {
|
|
||||||
c.timerFun[topic] = fun
|
func (c *ZHubClient) Timer(topic string, expr string, fun func()) {
|
||||||
c.send("timer", topic, expr, "x")
|
c.timerFun[topic] = fun
|
||||||
}*/
|
c.send("timer", topic, expr, "x")
|
||||||
func (c *Client) Timer(topic string, fun func()) {
|
}
|
||||||
|
*/
|
||||||
|
func (c *ZHubClient) Timer(topic string, fun func()) {
|
||||||
if fun != nil {
|
if fun != nil {
|
||||||
c.timerFun[topic] = fun
|
c.timerFun[topic] = fun
|
||||||
}
|
}
|
||||||
@@ -180,7 +183,7 @@ func (c *Client) Timer(topic string, fun func()) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Cmd send cmd
|
// Cmd send cmd
|
||||||
func (c *Client) Cmd(cmd ...string) {
|
func (c *ZHubClient) Cmd(cmd ...string) {
|
||||||
if len(cmd) == 1 {
|
if len(cmd) == 1 {
|
||||||
c.send("cmd", cmd[0])
|
c.send("cmd", cmd[0])
|
||||||
} else if len(cmd) > 1 {
|
} else if len(cmd) > 1 {
|
||||||
@@ -191,35 +194,44 @@ func (c *Client) Cmd(cmd ...string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Close() {
|
func (c *ZHubClient) Close() {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TryLock
|
||||||
|
func TryLock(key string, duration int) {
|
||||||
|
/*uuid := uuid.New()
|
||||||
|
|
||||||
|
TODO
|
||||||
|
|
||||||
|
return Lock{Key: key, Value: uuid}*/
|
||||||
|
}
|
||||||
|
|
||||||
// Lock Key
|
// Lock Key
|
||||||
func (c *Client) Lock(key string, duration int) Lock {
|
func (c *ZHubClient) Lock(key string, duration int) Lock {
|
||||||
v := uuid.New()
|
uuid := uuid.New()
|
||||||
c.send("v", key, v, strconv.Itoa(duration))
|
c.send("uuid", key, uuid, strconv.Itoa(duration))
|
||||||
|
|
||||||
lockChan := make(chan int, 2)
|
lockChan := make(chan int, 2)
|
||||||
go func() {
|
go func() {
|
||||||
c.wlock.Lock()
|
c.wlock.Lock()
|
||||||
defer c.wlock.Unlock()
|
defer c.wlock.Unlock()
|
||||||
c.lockFlag[v] = &Lock{
|
c.lockFlag[uuid] = &Lock{
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: v,
|
Value: uuid,
|
||||||
flagChan: lockChan,
|
flagChan: lockChan,
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-lockChan:
|
case <-lockChan:
|
||||||
log.Println("v-ok", time.Now().UnixNano()/1e6, v)
|
log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Lock{Key: key, Value: v}
|
return Lock{Key: key, Value: uuid}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Unlock(l Lock) {
|
func (c *ZHubClient) Unlock(l Lock) {
|
||||||
c.send("unlock", l.Key, l.Value)
|
c.send("unlock", l.Key, l.Value)
|
||||||
delete(c.lockFlag, l.Value)
|
delete(c.lockFlag, l.Value)
|
||||||
}
|
}
|
||||||
@@ -228,7 +240,7 @@ func (c *Client) Unlock(l Lock) {
|
|||||||
var rpcMap = make(map[string]*Rpc)
|
var rpcMap = make(map[string]*Rpc)
|
||||||
var rpcLock = sync.RWMutex{}
|
var rpcLock = sync.RWMutex{}
|
||||||
|
|
||||||
func (c *Client) rpcInit() {
|
func (c *ZHubClient) rpcInit() {
|
||||||
|
|
||||||
// 添加 appname 主题订阅处理
|
// 添加 appname 主题订阅处理
|
||||||
c.Subscribe(c.appname, func(v string) {
|
c.Subscribe(c.appname, func(v string) {
|
||||||
@@ -275,7 +287,11 @@ func (r Rpc) backTopic() string {
|
|||||||
return strings.Split(r.Ruk, "::")[0]
|
return strings.Split(r.Ruk, "::")[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
|
func (c *ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) {
|
||||||
|
c.RpcWithTimeout(topic, message, time.Second*15, back)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ZHubClient) RpcWithTimeout(topic string, message string, timeout time.Duration, back func(res RpcResult)) {
|
||||||
rpc := Rpc{
|
rpc := Rpc{
|
||||||
Ruk: c.appname + "::" + uuid.New(),
|
Ruk: c.appname + "::" + uuid.New(),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
@@ -296,7 +312,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
|
|||||||
select {
|
select {
|
||||||
case <-rpc.Ch:
|
case <-rpc.Ch:
|
||||||
// ch 事件(rpc 返回)
|
// ch 事件(rpc 返回)
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(timeout):
|
||||||
// rpc 超时
|
// rpc 超时
|
||||||
x, _ := json.Marshal(rpc)
|
x, _ := json.Marshal(rpc)
|
||||||
log.Println("rpc timeout:", x)
|
log.Println("rpc timeout:", x)
|
||||||
@@ -309,7 +325,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RpcSubscribe rpc subscribe
|
// RpcSubscribe rpc subscribe
|
||||||
func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
func (c *ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
||||||
c.Subscribe(topic, func(v string) {
|
c.Subscribe(topic, func(v string) {
|
||||||
rpc := Rpc{}
|
rpc := Rpc{}
|
||||||
err := json.Unmarshal([]byte(v), &rpc)
|
err := json.Unmarshal([]byte(v), &rpc)
|
||||||
@@ -322,7 +338,7 @@ func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
|||||||
result.Ruk = rpc.Ruk
|
result.Ruk = rpc.Ruk
|
||||||
|
|
||||||
res, _ := json.Marshal(result)
|
res, _ := json.Marshal(result)
|
||||||
c.Publish(rpc.backTopic(), string(res[:]))
|
c.Publish(rpc.backTopic(), string(res))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,7 +349,7 @@ send socket message :
|
|||||||
if len(vs) equal 1 will send message `vs[0] + "\r\n"`
|
if len(vs) equal 1 will send message `vs[0] + "\r\n"`
|
||||||
else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...`
|
else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...`
|
||||||
*/
|
*/
|
||||||
func (c *Client) send(vs ...string) (err error) {
|
func (c *ZHubClient) send(vs ...string) (err error) {
|
||||||
//chSend <- vs
|
//chSend <- vs
|
||||||
c.wlock.Lock()
|
c.wlock.Lock()
|
||||||
defer c.wlock.Unlock()
|
defer c.wlock.Unlock()
|
||||||
@@ -351,13 +367,19 @@ a:
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
|
// check conn reconnect
|
||||||
|
{
|
||||||
|
c.wlock.Unlock()
|
||||||
|
c.reconn()
|
||||||
|
c.wlock.Lock()
|
||||||
|
}
|
||||||
goto a
|
goto a
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) receive() {
|
func (c *ZHubClient) receive() {
|
||||||
r := bufio.NewReader(c.conn)
|
r := bufio.NewReader(c.conn)
|
||||||
for {
|
for {
|
||||||
v, _, err := r.ReadLine()
|
v, _, err := r.ReadLine()
|
||||||
@@ -373,6 +395,7 @@ func (c *Client) receive() {
|
|||||||
if string(v) == "+ping" {
|
if string(v) == "+ping" {
|
||||||
c.send("+pong")
|
c.send("+pong")
|
||||||
}
|
}
|
||||||
|
log.Println("receive:", string(v))
|
||||||
case '-':
|
case '-':
|
||||||
log.Println("error:", string(v))
|
log.Println("error:", string(v))
|
||||||
case '*':
|
case '*':
|
||||||
@@ -384,7 +407,8 @@ func (c *Client) receive() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var vs []string
|
var vs []string
|
||||||
for i := 0; i < n; i++ {
|
//for i := 0; i < n; i++ {
|
||||||
|
for len(vs) < n {
|
||||||
line, _, err := r.ReadLine()
|
line, _, err := r.ReadLine()
|
||||||
if err != nil || line == nil {
|
if err != nil || line == nil {
|
||||||
continue
|
continue
|
||||||
@@ -404,16 +428,20 @@ func (c *Client) receive() {
|
|||||||
}
|
}
|
||||||
vs = append(vs, string(buf))
|
vs = append(vs, string(buf))
|
||||||
}
|
}
|
||||||
if len(vs) == 3 && vs[0] == "message" && vs[1] == "lock" {
|
if len(vs) == 3 && vs[0] == "message" {
|
||||||
go func() {
|
if vs[1] == "lock" {
|
||||||
log.Println("lock:" + vs[2])
|
go func() {
|
||||||
c.wlock.Lock()
|
log.Println("lock:" + vs[2])
|
||||||
defer c.wlock.Unlock()
|
c.wlock.Lock()
|
||||||
if c.lockFlag[vs[2]] == nil {
|
defer c.wlock.Unlock()
|
||||||
return
|
if c.lockFlag[vs[2]] == nil {
|
||||||
}
|
return
|
||||||
c.lockFlag[vs[2]].flagChan <- 0
|
}
|
||||||
}()
|
c.lockFlag[vs[2]].flagChan <- 0
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
c.chReceive <- vs
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(vs) == 2 && vs[0] == "timer" {
|
if len(vs) == 2 && vs[0] == "timer" {
|
||||||
@@ -423,118 +451,3 @@ func (c *Client) receive() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------- hm --------------------------------------
|
|
||||||
|
|
||||||
// ==============================================================================
|
|
||||||
|
|
||||||
var reconnect = 0
|
|
||||||
|
|
||||||
// ClientRun client 命令行程序
|
|
||||||
func ClientRun(addr string) {
|
|
||||||
conn, err := net.Dial("tcp", fmt.Sprintf("%s", addr))
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
conn, err = net.Dial("tcp", fmt.Sprintf("%s", addr))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(fmt.Sprintf("had connected server: %s", addr))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if reconnect == 1 {
|
|
||||||
conn.Close()
|
|
||||||
ClientRun(addr)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go clientRead(conn)
|
|
||||||
|
|
||||||
for {
|
|
||||||
inReader := bufio.NewReader(os.Stdin)
|
|
||||||
line, err := inReader.ReadString('\n')
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return
|
|
||||||
} else if reconnect == 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
line = strings.Trim(line, "\r\n")
|
|
||||||
line = strings.Trim(line, "\n")
|
|
||||||
line = strings.Trim(line, " ")
|
|
||||||
|
|
||||||
if strings.EqualFold(line, "") {
|
|
||||||
continue
|
|
||||||
} else if strings.EqualFold(line, ":exit") {
|
|
||||||
fmt.Println("exit!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//fmt.Println("发送数据:" + line)
|
|
||||||
|
|
||||||
line = strings.ReplaceAll(line, " ", "")
|
|
||||||
parr := strings.Split(line, " ")
|
|
||||||
conn.Write([]byte("*" + strconv.Itoa(len(parr)) + "\r\n"))
|
|
||||||
for i := range parr {
|
|
||||||
conn.Write([]byte("$" + strconv.Itoa(len(parr[i])) + "\r\n"))
|
|
||||||
conn.Write([]byte(parr[i] + "\r\n"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func clientRead(conn net.Conn) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
fmt.Println("Recovered:", r)
|
|
||||||
}
|
|
||||||
reconnect = 1
|
|
||||||
}()
|
|
||||||
|
|
||||||
reader := bufio.NewReader(conn)
|
|
||||||
for {
|
|
||||||
rcmd := make([]string, 0)
|
|
||||||
line, _, err := reader.ReadLine()
|
|
||||||
if err != nil {
|
|
||||||
log.Println("connection error: ", err)
|
|
||||||
return
|
|
||||||
} else if len(line) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
switch string(line[:1]) {
|
|
||||||
case "*":
|
|
||||||
n, _ := strconv.Atoi(string(line[1:]))
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
reader.ReadLine()
|
|
||||||
v, _, _ := reader.ReadLine()
|
|
||||||
rcmd = append(rcmd, string(v))
|
|
||||||
}
|
|
||||||
case "+":
|
|
||||||
rcmd = append(rcmd, string(line))
|
|
||||||
case "-":
|
|
||||||
rcmd = append(rcmd, string(line))
|
|
||||||
case ":":
|
|
||||||
rcmd = append(rcmd, string(line))
|
|
||||||
case "h":
|
|
||||||
if strings.EqualFold(string(line), "help-start") {
|
|
||||||
for {
|
|
||||||
v, _, _ := reader.ReadLine()
|
|
||||||
if strings.EqualFold(string(v), "help-end") {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
rcmd = append(rcmd, string(v)+"\r\n")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
rcmd = append(rcmd, string(line))
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(">", strings.Join(rcmd, " "))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,110 +1,45 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var zhub *Client
|
var hub *ZHubClient
|
||||||
|
var once = sync.Once{}
|
||||||
var (
|
|
||||||
addr = "127.0.0.1:1216"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
client, err := Create("zhub-cli", addr, "C-0", "admin@123456")
|
once.Do(func() {
|
||||||
if err != nil {
|
hub := &ZHubClient{
|
||||||
log.Fatal(err)
|
appname: "hub-cli",
|
||||||
}
|
addr: "127.0.0.1:1216",
|
||||||
zhub = client
|
groupid: "C-0",
|
||||||
}
|
auth: "admin@123456",
|
||||||
|
}
|
||||||
|
err := hub.Init()
|
||||||
|
|
||||||
func newClient(appname, groupid string) *Client {
|
if err != nil {
|
||||||
client, err := Create(appname, addr, groupid, "admin@123456")
|
log.Fatal(err)
|
||||||
if err != nil {
|
}
|
||||||
log.Fatal(err)
|
hub = hub
|
||||||
}
|
})
|
||||||
return client
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTimer(t *testing.T) {
|
|
||||||
go func() {
|
|
||||||
client := newClient("zhub-cli", "g-1")
|
|
||||||
|
|
||||||
client.Subscribe("ax1", func(v string) {
|
|
||||||
log.Println("topic-1-ax: " + v)
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
client := newClient("zhub-cli", "g-1")
|
|
||||||
|
|
||||||
client.Subscribe("ax1", func(v string) {
|
|
||||||
log.Println("topic-2-ax: " + v)
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
client := newClient("zhub-cli", "g-1")
|
|
||||||
|
|
||||||
client.Subscribe("ax1", func(v string) {
|
|
||||||
log.Println("topic-3-ax: " + v)
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(time.Hour * 3)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendCmd(t *testing.T) {
|
|
||||||
client := newClient("zhub-cli", "group-admin")
|
|
||||||
|
|
||||||
//client.Cmd("reload-timer")
|
|
||||||
client.Cmd("shutdown")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPublish(t *testing.T) {
|
|
||||||
|
|
||||||
/*zhub.Publish("abx", "asd\r\nxxx1")
|
|
||||||
zhub.Publish("abx", "asd\r\nxxx2")
|
|
||||||
zhub.Publish("abx", "asd\r\nxxx3")
|
|
||||||
zhub.Publish("abx", "asd\r\nxxx4")
|
|
||||||
zhub.Publish("abx", "asd\r\nxxx5")*/
|
|
||||||
|
|
||||||
/*for i := 0; i < 10000; i++ {
|
|
||||||
zhub.Publish("ax1", strconv.Itoa(i))
|
|
||||||
}*/
|
|
||||||
|
|
||||||
/*for i := 0; i < 20_0000; i++ {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
zhub.Publish("b", strconv.Itoa(i))
|
|
||||||
}*/
|
|
||||||
|
|
||||||
/*zhub.Subscribe("wx:user-follow", func(v string) {
|
|
||||||
fmt.Println(v)
|
|
||||||
})*/
|
|
||||||
|
|
||||||
//hub.Publish("ax", "1")
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLock(t *testing.T) {
|
func TestLock(t *testing.T) {
|
||||||
client, _ := Create("zhub-cli", addr, "xx", "admin@123456")
|
hub.Subscribe("lock", func(v string) {
|
||||||
|
|
||||||
client.Subscribe("lock", func(v string) {
|
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
var fun = func(x string) {
|
var fun = func(x string) {
|
||||||
log.Println("lock", time.Now().UnixNano()/1e6)
|
log.Println("lock", time.Now().UnixNano()/1e6)
|
||||||
lock := client.Lock("a", 30)
|
lock := hub.Lock("a", 30)
|
||||||
defer client.Unlock(lock)
|
defer hub.Unlock(lock)
|
||||||
//client.Lock("a", 5)
|
//client.Lock("a", 5)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@@ -124,6 +59,8 @@ func rotate(nums []int, k int) {
|
|||||||
k = k % len(nums)
|
k = k % len(nums)
|
||||||
nums = append(nums[len(nums)-k:], nums[0:len(nums)-k]...)
|
nums = append(nums[len(nums)-k:], nums[0:len(nums)-k]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 特殊符号测试
|
||||||
func TestName(t *testing.T) {
|
func TestName(t *testing.T) {
|
||||||
//str := ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}"
|
//str := ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}"
|
||||||
/*str := "别人家的女娃子🤞🏻"
|
/*str := "别人家的女娃子🤞🏻"
|
||||||
@@ -150,83 +87,7 @@ func TestName(t *testing.T) {
|
|||||||
|
|
||||||
func toStr(d interface{}) string {
|
func toStr(d interface{}) string {
|
||||||
bs, _ := json.Marshal(d)
|
bs, _ := json.Marshal(d)
|
||||||
return string(bs[:])
|
return string(bs)
|
||||||
}
|
|
||||||
|
|
||||||
// 接收数据 A
|
|
||||||
func TestC_a(t *testing.T) {
|
|
||||||
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
zhub.Subscribe("cmt:user-msg", func(v string) {
|
|
||||||
fmt.Println(v)
|
|
||||||
})
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 接收数据
|
|
||||||
func TestC_ab(t *testing.T) {
|
|
||||||
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
zhub.Subscribe("a", func(v string) {
|
|
||||||
fmt.Println("a:", v)
|
|
||||||
})
|
|
||||||
zhub.Subscribe("b", func(v string) {
|
|
||||||
fmt.Println("b:", v)
|
|
||||||
})
|
|
||||||
zhub.Subscribe("im:friend:186", func(v string) {
|
|
||||||
fmt.Println("im:friend:186:", v)
|
|
||||||
})
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDelay2(t *testing.T) {
|
|
||||||
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var x int64 = 0
|
|
||||||
go func() {
|
|
||||||
zhub.Subscribe("a", func(v string) {
|
|
||||||
fmt.Println(v, "-", atomic.AddInt64(&x, 1))
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
zhub2, err := Create("zhub-cli", addr, "C-1", "admin@123456")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
zhub2.Subscribe("a", func(v string) {
|
|
||||||
fmt.Println(v, "-", atomic.AddInt64(&x, 1))
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 20000)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDelay(t *testing.T) {
|
|
||||||
//zhub.Delay("abx", "1", -1)
|
|
||||||
|
|
||||||
//zhub.Publish("yk-topic", "hello yk.")
|
|
||||||
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
zhub.Publish("a", "x-"+strconv.Itoa(i))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试发送微信 模板消息
|
// 测试发送微信 模板消息
|
||||||
@@ -267,12 +128,12 @@ func TestWxSendMessage(t *testing.T) {
|
|||||||
|
|
||||||
log.Println(toStr(tplData["templateData"]))
|
log.Println(toStr(tplData["templateData"]))
|
||||||
|
|
||||||
zhub.Publish("wx:send-template-message", toStr(tplData))
|
hub.Publish("wx:send-template-message", toStr(tplData))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 监听各项目所有接口请求失败信息《接口请求失败,发送失败信息到 pro.app-error 主题》
|
// 监听各项目所有接口请求失败信息《接口请求失败,发送失败信息到 pro.app-error 主题》
|
||||||
func TestAppErrorConsole(t *testing.T) {
|
func TestAppErrorConsole(t *testing.T) {
|
||||||
zhub.Subscribe("app-error", func(v string) {
|
hub.Subscribe("app-error", func(v string) {
|
||||||
strs := []string{
|
strs := []string{
|
||||||
"未登陆", "账号已在其他设备登录", "今日已签到", "您的登录状态已过期", "用户不存在", "暂无可预定场馆",
|
"未登陆", "账号已在其他设备登录", "今日已签到", "您的登录状态已过期", "用户不存在", "暂无可预定场馆",
|
||||||
}
|
}
|
||||||
@@ -291,8 +152,7 @@ func TestAppErrorConsole(t *testing.T) {
|
|||||||
// ----------- rpc test -----------
|
// ----------- rpc test -----------
|
||||||
|
|
||||||
func TestRpcCall_S(t *testing.T) {
|
func TestRpcCall_S(t *testing.T) {
|
||||||
zhub := newClient("zhub-cli-a", "x")
|
hub.RpcSubscribe("ai-test", func(r Rpc) RpcResult {
|
||||||
zhub.RpcSubscribe("ai-test", func(r Rpc) RpcResult {
|
|
||||||
upper := strings.ToUpper(r.Value)
|
upper := strings.ToUpper(r.Value)
|
||||||
return RpcResult{Retcode: 0, Retinfo: "操作成功", Result: upper}
|
return RpcResult{Retcode: 0, Retinfo: "操作成功", Result: upper}
|
||||||
})
|
})
|
||||||
@@ -315,7 +175,7 @@ func TestRpcCall_C(t *testing.T) {
|
|||||||
print(res.Result)
|
print(res.Result)
|
||||||
})*/
|
})*/
|
||||||
|
|
||||||
zhub.Subscribe("zcore:monitor-error", func(v string) {
|
hub.Subscribe("zcore:monitor-error", func(v string) {
|
||||||
fmt.Println(v)
|
fmt.Println(v)
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -325,13 +185,3 @@ func TestRpcCall_C(t *testing.T) {
|
|||||||
fmt.Println(res)
|
fmt.Println(res)
|
||||||
})*/
|
})*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBannedTalk(t *testing.T) {
|
|
||||||
/*zhub.Rpc("im:banned-talk", "{'imtoken':'74074f9e599947ca940e71a9788e768f'}", func(res RpcResult) {
|
|
||||||
fmt.Print(res)
|
|
||||||
})*/
|
|
||||||
|
|
||||||
encoding := base64.Encoding{}
|
|
||||||
toString := encoding.EncodeToString([]byte("420101190001011234"))
|
|
||||||
fmt.Println(toString)
|
|
||||||
}
|
|
||||||
|
|||||||
88
go.mod
88
go.mod
@@ -1,55 +1,63 @@
|
|||||||
module zhub
|
module gitea.1216.top/lxy/zhub
|
||||||
|
|
||||||
go 1.20
|
go 1.25.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/gin-gonic/gin v1.9.0
|
github.com/gin-gonic/gin v1.11.0
|
||||||
github.com/go-basic/uuid v1.0.0
|
github.com/go-basic/uuid v1.0.0
|
||||||
github.com/go-sql-driver/mysql v1.5.0
|
github.com/go-sql-driver/mysql v1.9.3
|
||||||
|
github.com/lib/pq v1.10.9
|
||||||
github.com/robfig/cron v1.2.0
|
github.com/robfig/cron v1.2.0
|
||||||
github.com/spf13/viper v1.15.0
|
github.com/spf13/viper v1.21.0
|
||||||
|
gopkg.in/ini.v1 v1.67.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/bytedance/sonic v1.9.1 // indirect
|
filippo.io/edwards25519 v1.1.0 // indirect
|
||||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
|
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||||
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
|
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||||
|
github.com/goccy/go-yaml v1.18.0 // indirect
|
||||||
|
github.com/quic-go/qpack v0.5.1 // indirect
|
||||||
|
github.com/quic-go/quic-go v0.55.0 // indirect
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1 // indirect
|
||||||
|
github.com/sagikazarmark/locafero v0.12.0 // indirect
|
||||||
|
go.uber.org/mock v0.6.0 // indirect
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||||
|
golang.org/x/mod v0.28.0 // indirect
|
||||||
|
golang.org/x/sync v0.17.0 // indirect
|
||||||
|
golang.org/x/tools v0.37.0 // indirect
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/bytedance/sonic v1.14.1 // indirect
|
||||||
|
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||||
|
github.com/gabriel-vasile/mimetype v1.4.10 // indirect
|
||||||
|
github.com/gin-contrib/sse v1.1.0 // indirect
|
||||||
github.com/go-playground/locales v0.14.1 // indirect
|
github.com/go-playground/locales v0.14.1 // indirect
|
||||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||||
github.com/go-playground/validator/v10 v10.14.0 // indirect
|
github.com/go-playground/validator/v10 v10.27.0 // indirect
|
||||||
github.com/goccy/go-json v0.10.2 // indirect
|
github.com/goccy/go-json v0.10.5 // indirect
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
|
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||||
github.com/leodido/go-urn v1.2.4 // indirect
|
github.com/leodido/go-urn v1.4.0 // indirect
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
|
||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
||||||
github.com/spf13/afero v1.9.3 // indirect
|
github.com/spf13/afero v1.15.0 // indirect
|
||||||
github.com/spf13/cast v1.5.0 // indirect
|
github.com/spf13/cast v1.10.0 // indirect
|
||||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
github.com/spf13/pflag v1.0.10 // indirect
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
github.com/subosito/gotenv v1.6.0 // indirect
|
||||||
github.com/subosito/gotenv v1.4.2 // indirect
|
|
||||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||||
github.com/ugorji/go/codec v1.2.11 // indirect
|
github.com/ugorji/go/codec v1.3.0 // indirect
|
||||||
golang.org/x/arch v0.3.0 // indirect
|
golang.org/x/arch v0.21.0 // indirect
|
||||||
golang.org/x/crypto v0.9.0 // indirect
|
golang.org/x/crypto v0.42.0 // indirect
|
||||||
golang.org/x/net v0.10.0 // indirect
|
golang.org/x/net v0.44.0 // indirect; indirect·
|
||||||
golang.org/x/sys v0.8.0 // indirect
|
golang.org/x/sys v0.36.0 // indirect
|
||||||
golang.org/x/text v0.9.0 // indirect
|
golang.org/x/text v0.29.0 // indirect
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
google.golang.org/protobuf v1.36.10 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
)
|
|
||||||
|
|
||||||
replace (
|
|
||||||
zhub/cmd => ./zhub/cmd
|
|
||||||
zhub/internal/config => ./zhub/internal/config
|
|
||||||
zhub/internal/monitor => ./zhub/internal/monitor
|
|
||||||
zhub/internal/zsub => ./zhub/internal/zsub
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,12 +2,14 @@ package auth
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"gopkg.in/yaml.v3"
|
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/config"
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
type User struct {
|
type User struct {
|
||||||
@@ -63,6 +65,12 @@ type PermissionManager struct {
|
|||||||
func (p *PermissionManager) Init() error {
|
func (p *PermissionManager) Init() error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
Conf := config.ReadConfig()
|
||||||
|
if !Conf.Service.Auth {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Load YAML configuration from file
|
// Load YAML configuration from file
|
||||||
data, err := os.ReadFile("./auth.yml")
|
data, err := os.ReadFile("./auth.yml")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,36 +1,40 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/spf13/viper"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"gopkg.in/ini.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Log struct {
|
type Log struct {
|
||||||
Handlers string
|
Handlers string `ini:"handlers"`
|
||||||
Level string
|
Level string `ini:"level"`
|
||||||
File string
|
File string `ini:"file"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Log Log
|
Log Log `ini:"log"`
|
||||||
Service struct {
|
Service struct {
|
||||||
Watch string
|
Watch string `ini:"watch"`
|
||||||
Addr string
|
Addr string `ini:"addr"`
|
||||||
Auth bool
|
Auth bool `ini:"auth"`
|
||||||
}
|
} `ini:"service"`
|
||||||
Data struct {
|
Data struct {
|
||||||
Dir string
|
Dir string `ini:"dir"`
|
||||||
}
|
} `ini:"data"`
|
||||||
Ztimer struct {
|
Ztimer struct {
|
||||||
Db struct {
|
Db struct {
|
||||||
Addr string
|
Addr string `ini:"addr"`
|
||||||
User string
|
User string `ini:"user"`
|
||||||
Password string
|
Password string `ini:"password"`
|
||||||
Database string
|
Database string `ini:"database"`
|
||||||
}
|
Schema string `ini:"schema"`
|
||||||
}
|
Type string `ini:"type"`
|
||||||
Auth map[string]string
|
} `ini:"db"`
|
||||||
|
} `ini:"ztimer"`
|
||||||
|
Auth map[string]string `ini:"auth"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadConfig() Config {
|
func ReadConfig() Config {
|
||||||
@@ -57,16 +61,16 @@ func ReadConfig() Config {
|
|||||||
}*/
|
}*/
|
||||||
|
|
||||||
// 尝试从 /etc/ 目录下查找 zhub.ini 配置文件
|
// 尝试从 /etc/ 目录下查找 zhub.ini 配置文件
|
||||||
viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径
|
/*viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径
|
||||||
viper.SetConfigName("zhub") // 指定配置文件名为 zhub
|
viper.SetConfigName("zhub") // 指定配置文件名为 zhub
|
||||||
if err := viper.ReadInConfig(); err == nil {
|
if err := viper.ReadInConfig(); err == nil {
|
||||||
if err := viper.Unmarshal(&conf); err != nil {
|
if err := viper.Unmarshal(&conf); err != nil {
|
||||||
log.Fatalf("Failed to unmarshal config: %s", err.Error())
|
log.Fatalf("Failed to unmarshal config: %s", err.Error())
|
||||||
}
|
}
|
||||||
return conf
|
return conf
|
||||||
}
|
}*/
|
||||||
// 如果 /etc/ 目录下未找到配置文件,则尝试从当前程序运行目录下查找 app.ini 配置文件
|
// 如果 /etc/ 目录下未找到配置文件,则尝试从当前程序运行目录下查找 app.ini 配置文件
|
||||||
dir, err := os.Getwd() // 获取程序运行目录
|
/*dir, err := os.Getwd() // 获取程序运行目录
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to get current directory: %s", err.Error())
|
log.Fatalf("Failed to get current directory: %s", err.Error())
|
||||||
}
|
}
|
||||||
@@ -77,10 +81,24 @@ func ReadConfig() Config {
|
|||||||
if err := viper.Unmarshal(&conf); err != nil {
|
if err := viper.Unmarshal(&conf); err != nil {
|
||||||
log.Fatalf("Failed to unmarshal config: %s", err.Error())
|
log.Fatalf("Failed to unmarshal config: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
// conf.Ztimer.Db.Password 包含 %, 做url 转码
|
||||||
|
if strings.Contains(conf.Ztimer.Db.Password, "%") {
|
||||||
|
unescape, err := url.QueryUnescape(conf.Ztimer.Db.Password)
|
||||||
|
if err == nil {
|
||||||
|
conf.Ztimer.Db.Password = unescape
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return conf
|
return conf
|
||||||
|
} else {
|
||||||
|
log.Fatalf("Config file not found: " + err.Error())
|
||||||
|
}*/
|
||||||
|
|
||||||
|
load, err := ini.Load("app.ini")
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("Failed to load config: %s", err.Error())
|
||||||
}
|
}
|
||||||
// 如果在 /etc/ 目录和当前程序所在目录下均未找到配置文件,则报错
|
load.MapTo(&conf)
|
||||||
log.Fatalf("Config file not found")
|
|
||||||
return conf
|
return conf
|
||||||
}
|
}
|
||||||
func InitLog(logConfig Log) {
|
func InitLog(logConfig Log) {
|
||||||
|
|||||||
@@ -1,11 +1,18 @@
|
|||||||
package monitor
|
package monitor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gin-gonic/gin"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"zhub/internal/zsub"
|
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/zbus"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var r = gin.Default()
|
||||||
|
|
||||||
|
// Version 时间格式化 YYYY.MM.DD-HH.MM.SS
|
||||||
|
var Version string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// 1.日志文件 定期分割归档
|
// 1.日志文件 定期分割归档
|
||||||
|
|
||||||
@@ -13,31 +20,41 @@ func init() {
|
|||||||
|
|
||||||
func StartWatch() {
|
func StartWatch() {
|
||||||
|
|
||||||
r := gin.Default()
|
|
||||||
|
|
||||||
r.Group("/users")
|
|
||||||
|
|
||||||
r.GET("/", func(c *gin.Context) {
|
r.GET("/", func(c *gin.Context) {
|
||||||
c.File("./public/index.html")
|
c.File("./public/index.html")
|
||||||
})
|
})
|
||||||
|
|
||||||
r.GET("/info", func(c *gin.Context) {
|
r.GET("/_/info", func(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, zsub.Info())
|
c.JSON(http.StatusOK, zbus.Info())
|
||||||
})
|
})
|
||||||
r.GET("/cleanup", func(c *gin.Context) {
|
r.GET("/_/cleanup", func(c *gin.Context) {
|
||||||
zsub.Hub.Clearup()
|
zbus.Bus.Clearup()
|
||||||
c.JSON(http.StatusOK, "+OK")
|
c.JSON(http.StatusOK, "+OK")
|
||||||
})
|
})
|
||||||
|
r.GET("/_/version", func(c *gin.Context) {
|
||||||
|
c.JSON(http.StatusOK, map[string]string{
|
||||||
|
"version": Version,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
r.GET("/timer/reload", func(c *gin.Context) {
|
r.GET("/timer/reload", func(c *gin.Context) {
|
||||||
zsub.Hub.ReloadTimer()
|
zbus.Bus.ReloadTimer()
|
||||||
c.JSON(http.StatusOK, "+reload timer ok")
|
c.JSON(http.StatusOK, "+reload timer ok")
|
||||||
})
|
})
|
||||||
r.GET("/topic/publish", func(c *gin.Context) {
|
r.POST("/message/send", func(c *gin.Context) {
|
||||||
topic := c.Query("topic")
|
topic := c.PostForm("name") // c.Query("topic")
|
||||||
value := c.Query("value")
|
value := c.PostForm("value") // c.Query("value")
|
||||||
|
// _type := c.PostForm("type") // publish、broadcast
|
||||||
|
|
||||||
zsub.Hub.Publish(topic, value)
|
log.Println(c.PostForm("type"), topic, value)
|
||||||
|
switch c.PostForm("type") {
|
||||||
|
case "broadcast":
|
||||||
|
zbus.Bus.Broadcast(topic, value)
|
||||||
|
case "publish":
|
||||||
|
zbus.Bus.Publish(topic, value)
|
||||||
|
default:
|
||||||
|
|
||||||
|
}
|
||||||
c.JSON(http.StatusOK, "+OK")
|
c.JSON(http.StatusOK, "+OK")
|
||||||
})
|
})
|
||||||
r.GET("/topic/delay", func(c *gin.Context) {
|
r.GET("/topic/delay", func(c *gin.Context) {
|
||||||
@@ -45,16 +62,16 @@ func StartWatch() {
|
|||||||
value := c.Query("value")
|
value := c.Query("value")
|
||||||
delay := c.Query("delay")
|
delay := c.Query("delay")
|
||||||
|
|
||||||
zsub.Hub.Delay([]string{"delay", topic, value, delay})
|
zbus.Bus.Delay([]string{"delay", topic, value, delay})
|
||||||
c.JSON(http.StatusOK, "+OK")
|
c.JSON(http.StatusOK, "+OK")
|
||||||
})
|
})
|
||||||
|
|
||||||
// reload the auth configuration
|
// reload the auth configuration
|
||||||
r.GET("/auth/reload", func(c *gin.Context) {
|
r.GET("/auth/reload", func(c *gin.Context) {
|
||||||
zsub.AuthManager.Reload()
|
zbus.AuthManager.Reload()
|
||||||
c.JSON(http.StatusOK, "+OK")
|
c.JSON(http.StatusOK, "+OK")
|
||||||
})
|
})
|
||||||
|
|
||||||
watchAddr := zsub.Conf.Service.Watch
|
watchAddr := zbus.Conf.Service.Watch
|
||||||
r.Run(watchAddr)
|
r.Run(watchAddr)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -6,7 +6,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
"zhub/internal/auth"
|
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
var AuthManager *auth.PermissionManager
|
var AuthManager *auth.PermissionManager
|
||||||
@@ -22,10 +23,10 @@ func init() {
|
|||||||
|
|
||||||
var funChan = make(chan func(), 1000)
|
var funChan = make(chan func(), 1000)
|
||||||
|
|
||||||
func handleMessage(v Message) {
|
func messageHandler(v Message) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Println("handleMessage Recovered:", r)
|
log.Println("messageHandler Recovered:", r)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
c := v.Conn
|
c := v.Conn
|
||||||
@@ -47,7 +48,7 @@ func handleMessage(v Message) {
|
|||||||
|
|
||||||
// 准入拦截,所有指令完成 auth 认证后才可进入
|
// 准入拦截,所有指令完成 auth 认证后才可进入
|
||||||
if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" {
|
if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" {
|
||||||
c.send("-Auth: NOAUTH Authentication required:" + rcmd[0])
|
c.send("-Auth: Authentication required [" + rcmd[0] + "]")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 指令预处理
|
// 指令预处理
|
||||||
@@ -79,8 +80,16 @@ func handleMessage(v Message) {
|
|||||||
// auth check
|
// auth check
|
||||||
switch cmd {
|
switch cmd {
|
||||||
case "publish", "broadcast", "delay", "rpc":
|
case "publish", "broadcast", "delay", "rpc":
|
||||||
if !AuthManager.AuthCheck(c.user, rcmd[1], "w") {
|
if Conf.Service.Auth && !AuthManager.AuthCheck(c.user, rcmd[1], "w") {
|
||||||
c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.")
|
c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.")
|
||||||
|
log.Printf("[%d] -Auth: %s [%s]\n", c.sn, cmd, rcmd[1])
|
||||||
|
if cmd == "rpc" {
|
||||||
|
rpcBody := make(map[string]string)
|
||||||
|
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
|
||||||
|
|
||||||
|
ruk := rpcBody["ruk"]
|
||||||
|
Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 401, 'retinfo': 'unauthorized!', 'ruk': '"+ruk+"'}")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case "subscribe": // 在订阅逻辑处检查
|
case "subscribe": // 在订阅逻辑处检查
|
||||||
@@ -90,7 +99,7 @@ func handleMessage(v Message) {
|
|||||||
switch cmd {
|
switch cmd {
|
||||||
case "auth":
|
case "auth":
|
||||||
userid, err := AuthManager.GetUserIdByToken(rcmd[1])
|
userid, err := AuthManager.GetUserIdByToken(rcmd[1])
|
||||||
if err != nil {
|
if err != nil && Conf.Service.Auth {
|
||||||
c.send("-Error: " + err.Error())
|
c.send("-Error: " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -118,13 +127,13 @@ func handleMessage(v Message) {
|
|||||||
return
|
return
|
||||||
case "rpc":
|
case "rpc":
|
||||||
// if rpc and no sub back error
|
// if rpc and no sub back error
|
||||||
if Hub.noSubscribe(rcmd[1]) {
|
if Bus.noSubscribe(rcmd[1]) {
|
||||||
rpcBody := make(map[string]string)
|
rpcBody := make(map[string]string)
|
||||||
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
|
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
|
||||||
log.Println("rpc no subscribe: ", rcmd[1])
|
log.Printf("[%d] : rpc %s no subscribe", c.sn, rcmd[1])
|
||||||
|
|
||||||
ruk := rpcBody["ruk"]
|
ruk := rpcBody["ruk"]
|
||||||
Hub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}")
|
Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,7 +143,7 @@ func handleMessage(v Message) {
|
|||||||
/*if len(topicChan) < cap(topicChan) {
|
/*if len(topicChan) < cap(topicChan) {
|
||||||
topicChan <- rcmd
|
topicChan <- rcmd
|
||||||
}*/
|
}*/
|
||||||
Hub.Publish(rcmd[1], rcmd[2])
|
Bus.Publish(rcmd[1], rcmd[2])
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
case "publish":
|
case "publish":
|
||||||
@@ -144,13 +153,13 @@ func handleMessage(v Message) {
|
|||||||
/*if len(topicChan) < cap(topicChan) {
|
/*if len(topicChan) < cap(topicChan) {
|
||||||
topicChan <- rcmd
|
topicChan <- rcmd
|
||||||
}*/
|
}*/
|
||||||
Hub.Publish(rcmd[1], rcmd[2])
|
Bus.Publish(rcmd[1], rcmd[2])
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
case "broadcast":
|
case "broadcast":
|
||||||
Hub.broadcast(rcmd[1], rcmd[2])
|
Bus.Broadcast(rcmd[1], rcmd[2])
|
||||||
case "delay":
|
case "delay":
|
||||||
Hub.Delay(rcmd)
|
Bus.Delay(rcmd)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -166,8 +175,9 @@ func handleMessage(v Message) {
|
|||||||
// subscribe x y z
|
// subscribe x y z
|
||||||
for _, topic := range rcmd[1:] {
|
for _, topic := range rcmd[1:] {
|
||||||
// auth check
|
// auth check
|
||||||
if !AuthManager.AuthCheck(c.user, rcmd[1], "r") {
|
if Conf.Service.Auth && !AuthManager.AuthCheck(c.user, rcmd[1], "r") {
|
||||||
c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.")
|
c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.")
|
||||||
|
log.Printf("-Auth: %s [%s]\n", cmd, rcmd[1])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.subscribe(topic)
|
c.subscribe(topic)
|
||||||
@@ -178,7 +188,7 @@ func handleMessage(v Message) {
|
|||||||
}
|
}
|
||||||
case "timer":
|
case "timer":
|
||||||
for _, name := range rcmd[1:] {
|
for _, name := range rcmd[1:] {
|
||||||
Hub.timer([]string{"timer", name}, c) // append to timers
|
Bus.timer([]string{"timer", name}, c) // append to timers
|
||||||
c.timers = append(c.timers, name) // append to conns
|
c.timers = append(c.timers, name) // append to conns
|
||||||
}
|
}
|
||||||
case "cmd":
|
case "cmd":
|
||||||
@@ -187,28 +197,28 @@ func handleMessage(v Message) {
|
|||||||
}
|
}
|
||||||
switch rcmd[1] {
|
switch rcmd[1] {
|
||||||
case "reload-timer":
|
case "reload-timer":
|
||||||
Hub.ReloadTimer()
|
Bus.ReloadTimer()
|
||||||
case "shutdown":
|
case "shutdown":
|
||||||
if AuthManager.IsAdmin(c.user) {
|
if AuthManager.IsAdmin(c.user) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
Hub.shutdown()
|
Bus.shutdown()
|
||||||
}
|
}
|
||||||
case "lock":
|
case "lock", "trylock":
|
||||||
// lock key uuid 5
|
// lock key uuid 5
|
||||||
if len(rcmd) != 4 {
|
if len(rcmd) != 4 {
|
||||||
c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
|
c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
d, _ := strconv.Atoi(rcmd[3])
|
d, _ := strconv.Atoi(rcmd[3])
|
||||||
Hub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d})
|
Bus._lock(&Lock{cmd: cmd, key: rcmd[1], uuid: rcmd[2], duration: d})
|
||||||
case "unlock":
|
case "unlock":
|
||||||
// unlock key uuid
|
// unlock key uuid
|
||||||
if len(rcmd) != 3 {
|
if len(rcmd) != 3 {
|
||||||
c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
|
c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]})
|
Bus._unlock(Lock{cmd: cmd, key: rcmd[1], uuid: rcmd[2]})
|
||||||
default:
|
default:
|
||||||
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
|
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
|
||||||
return
|
return
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
@@ -12,19 +12,21 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
"zhub/internal/config"
|
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Conf config.Config
|
Conf config.Config
|
||||||
Hub = &ZSub{
|
Bus = &ZBus{
|
||||||
topics: make(map[string]*ZTopic),
|
topics: make(map[string]*ZTopic),
|
||||||
timers: make(map[string]*ZTimer),
|
timers: make(map[string]*ZTimer),
|
||||||
delays: make(map[string]*ZDelay),
|
delays: make(map[string]*ZDelay),
|
||||||
locks: make(map[string][]*Lock),
|
locks: make(map[string][]*Lock),
|
||||||
conns: make([]*ZConn, 0),
|
conns: make([]*ZConn, 0),
|
||||||
|
sn: 1000,
|
||||||
}
|
}
|
||||||
SN int32 = 1000
|
//SN int32 = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -41,7 +43,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
conns := make([]*ZConn, 0) // 需要关闭的连接
|
conns := make([]*ZConn, 0) // 需要关闭的连接
|
||||||
for _, c := range Hub.conns {
|
for _, c := range Bus.conns {
|
||||||
if c.ping > 0 && c.ping-c.pong > 19 {
|
if c.ping > 0 && c.ping-c.pong > 19 {
|
||||||
conns = c.appendTo(conns)
|
conns = c.appendTo(conns)
|
||||||
continue
|
continue
|
||||||
@@ -63,19 +65,20 @@ func init() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Hub.SaveData()
|
Bus.SaveData()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ZSub struct {
|
type ZBus struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
topics map[string]*ZTopic
|
topics map[string]*ZTopic // 订阅主题
|
||||||
timers map[string]*ZTimer
|
timers map[string]*ZTimer // 定时事件
|
||||||
delays map[string]*ZDelay
|
delays map[string]*ZDelay // 延时消息
|
||||||
locks map[string][]*Lock
|
locks map[string][]*Lock // 当前锁对象
|
||||||
conns []*ZConn
|
conns []*ZConn // 所有的客户端连接
|
||||||
delayup bool
|
sn int32 // 客户端连接编号
|
||||||
|
delayup bool // 是否需要延时持久保存数据
|
||||||
}
|
}
|
||||||
|
|
||||||
type ZConn struct { //ZConn
|
type ZConn struct { //ZConn
|
||||||
@@ -93,9 +96,10 @@ type ZConn struct { //ZConn
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Lock struct {
|
type Lock struct {
|
||||||
key string
|
cmd string // lock|trylock|unlock
|
||||||
uuid string
|
key string // lock key
|
||||||
duration int
|
uuid string // apply for unique identification
|
||||||
|
duration int // lock duration (seconds)
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
start int64
|
start int64
|
||||||
//stop time.Time
|
//stop time.Time
|
||||||
@@ -103,7 +107,7 @@ type Lock struct {
|
|||||||
|
|
||||||
func NewZConn(conn *net.Conn) *ZConn {
|
func NewZConn(conn *net.Conn) *ZConn {
|
||||||
return &ZConn{
|
return &ZConn{
|
||||||
sn: atomic.AddInt32(&SN, 1), // 连接编号
|
sn: atomic.AddInt32(&Bus.sn, 1),
|
||||||
conn: conn,
|
conn: conn,
|
||||||
topics: []string{},
|
topics: []string{},
|
||||||
timers: []string{},
|
timers: []string{},
|
||||||
@@ -119,9 +123,9 @@ func NewZConn(conn *net.Conn) *ZConn {
|
|||||||
3、若有待消费消息启动消费
|
3、若有待消费消息启动消费
|
||||||
*/
|
*/
|
||||||
func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
|
func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
|
||||||
Hub.Lock()
|
Bus.Lock()
|
||||||
defer Hub.Unlock()
|
defer Bus.Unlock()
|
||||||
ztopic := Hub.topics[topic] //ZTopic
|
ztopic := Bus.topics[topic] //ZTopic
|
||||||
if ztopic == nil {
|
if ztopic == nil {
|
||||||
ztopic = &ZTopic{
|
ztopic = &ZTopic{
|
||||||
groups: map[string]*ZGroup{},
|
groups: map[string]*ZGroup{},
|
||||||
@@ -129,7 +133,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
|
|||||||
chMsg: make(chan string, 500),
|
chMsg: make(chan string, 500),
|
||||||
}
|
}
|
||||||
ztopic.init()
|
ztopic.init()
|
||||||
Hub.topics[topic] = ztopic
|
Bus.topics[topic] = ztopic
|
||||||
}
|
}
|
||||||
|
|
||||||
zgroup := ztopic.groups[c.groupid] //ZGroup
|
zgroup := ztopic.groups[c.groupid] //ZGroup
|
||||||
@@ -159,7 +163,7 @@ func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{}
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
close(c.substoped[topic])
|
close(c.substoped[topic])
|
||||||
ztopic := Hub.topics[topic] //ZTopic
|
ztopic := Bus.topics[topic] //ZTopic
|
||||||
if ztopic == nil {
|
if ztopic == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -205,10 +209,10 @@ func (c *ZConn) close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// timer conn close
|
// timer conn close
|
||||||
Hub.Lock()
|
Bus.Lock()
|
||||||
defer Hub.Unlock()
|
defer Bus.Unlock()
|
||||||
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
|
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
|
||||||
timer := Hub.timers[topic]
|
timer := Bus.timers[topic]
|
||||||
if timer == nil {
|
if timer == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -267,8 +271,8 @@ func StartServer(addr string, conf config.Config) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 重新加载[定时、延时]
|
// 重新加载[定时、延时]
|
||||||
go Hub.ReloadTimer()
|
go Bus.ReloadTimer()
|
||||||
go Hub.LoadData()
|
go Bus.LoadData()
|
||||||
|
|
||||||
// 启动服务监听
|
// 启动服务监听
|
||||||
listen, err := net.Listen("tcp", addr)
|
listen, err := net.Listen("tcp", addr)
|
||||||
@@ -286,12 +290,12 @@ func StartServer(addr string, conf config.Config) {
|
|||||||
zConn := NewZConn(&conn)
|
zConn := NewZConn(&conn)
|
||||||
|
|
||||||
log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn)
|
log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn)
|
||||||
go Hub.handlerConn(zConn)
|
go Bus.handlerConn(zConn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 连接处理
|
// 连接处理
|
||||||
func (s *ZSub) handlerConn(c *ZConn) {
|
func (s *ZBus) handlerConn(c *ZConn) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Println("handlerConn Recovered:", r)
|
log.Println("handlerConn Recovered:", r)
|
||||||
@@ -301,7 +305,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
// conn remove to conns
|
// conn remove to conns
|
||||||
funChan <- func() {
|
funChan <- func() {
|
||||||
Hub.conns = c.removeTo(Hub.conns)
|
Bus.conns = c.removeTo(Bus.conns)
|
||||||
}
|
}
|
||||||
|
|
||||||
// close ZConn
|
// close ZConn
|
||||||
@@ -310,7 +314,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
|
|||||||
|
|
||||||
// conn add to conns
|
// conn add to conns
|
||||||
funChan <- func() {
|
funChan <- func() {
|
||||||
Hub.conns = c.appendTo(Hub.conns)
|
Bus.conns = c.appendTo(Bus.conns)
|
||||||
}
|
}
|
||||||
|
|
||||||
reader := bufio.NewReader(*c.conn)
|
reader := bufio.NewReader(*c.conn)
|
||||||
@@ -356,7 +360,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
handleMessage(Message{Conn: c, Rcmd: rcmd})
|
messageHandler(Message{Conn: c, Rcmd: rcmd})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -365,7 +369,7 @@ Publish topic message
|
|||||||
1、send message to topic's chan
|
1、send message to topic's chan
|
||||||
2、feedback send success to sender, and sending message to topic's subscripts
|
2、feedback send success to sender, and sending message to topic's subscripts
|
||||||
*/
|
*/
|
||||||
func (s *ZSub) Publish(topic, msg string) {
|
func (s *ZBus) Publish(topic, msg string) {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
ztopic := s.topics[topic] //ZTopic
|
ztopic := s.topics[topic] //ZTopic
|
||||||
@@ -384,9 +388,9 @@ func (s *ZSub) Publish(topic, msg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
send broadcast message
|
send Broadcast message
|
||||||
*/
|
*/
|
||||||
func (s *ZSub) broadcast(topic, msg string) {
|
func (s *ZBus) Broadcast(topic, msg string) {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
if strings.EqualFold(topic, "lock") {
|
if strings.EqualFold(topic, "lock") {
|
||||||
@@ -406,10 +410,11 @@ func (s *ZSub) broadcast(topic, msg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
lock: lock key uuid t
|
lock: lock key uuid t
|
||||||
unlock: unlock key uuid
|
tryLock: trylock key uuid t
|
||||||
|
unlock: unlock key uuid
|
||||||
*/
|
*/
|
||||||
func (s *ZSub) _lock(lock *Lock) {
|
func (s *ZBus) _lock(lock *Lock) {
|
||||||
locks := s.locks[lock.key]
|
locks := s.locks[lock.key]
|
||||||
if locks == nil {
|
if locks == nil {
|
||||||
locks = make([]*Lock, 0)
|
locks = make([]*Lock, 0)
|
||||||
@@ -418,7 +423,7 @@ func (s *ZSub) _lock(lock *Lock) {
|
|||||||
lock.start = time.Now().Unix()
|
lock.start = time.Now().Unix()
|
||||||
locks = append(locks, lock)
|
locks = append(locks, lock)
|
||||||
s.locks[lock.key] = locks
|
s.locks[lock.key] = locks
|
||||||
s.broadcast("lock", lock.uuid)
|
s.Broadcast("lock", lock.uuid)
|
||||||
|
|
||||||
// 设置时间到解锁
|
// 设置时间到解锁
|
||||||
locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
|
locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
|
||||||
@@ -429,10 +434,15 @@ func (s *ZSub) _lock(lock *Lock) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
} else {
|
} else {
|
||||||
s.locks[lock.key] = append(locks, lock)
|
switch lock.cmd {
|
||||||
|
case "trylock": // send trylock fail message
|
||||||
|
s.Broadcast("trylock", lock.uuid)
|
||||||
|
case "lock":
|
||||||
|
s.locks[lock.key] = append(locks, lock)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (s *ZSub) _unlock(l Lock) {
|
func (s *ZBus) _unlock(l Lock) {
|
||||||
locks := s.locks[l.key]
|
locks := s.locks[l.key]
|
||||||
if locks == nil || len(locks) == 0 {
|
if locks == nil || len(locks) == 0 {
|
||||||
return
|
return
|
||||||
@@ -443,7 +453,7 @@ func (s *ZSub) _unlock(l Lock) {
|
|||||||
s.locks[l.key] = locks
|
s.locks[l.key] = locks
|
||||||
}
|
}
|
||||||
if len(s.locks[l.key]) > 0 { // next lock
|
if len(s.locks[l.key]) > 0 { // next lock
|
||||||
s.broadcast("lock", s.locks[l.key][0].uuid)
|
s.Broadcast("lock", s.locks[l.key][0].uuid)
|
||||||
s.locks[l.key][0].start = time.Now().Unix()
|
s.locks[l.key][0].start = time.Now().Unix()
|
||||||
s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
|
s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -455,7 +465,7 @@ func (s *ZSub) _unlock(l Lock) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) shutdown() {
|
func (s *ZBus) shutdown() {
|
||||||
s.SaveData()
|
s.SaveData()
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
@@ -463,7 +473,7 @@ func (s *ZSub) shutdown() {
|
|||||||
func Info() map[string]interface{} {
|
func Info() map[string]interface{} {
|
||||||
// topics
|
// topics
|
||||||
topics := map[string]interface{}{}
|
topics := map[string]interface{}{}
|
||||||
for s, topic := range Hub.topics {
|
for s, topic := range Bus.topics {
|
||||||
// {groups:[{name:xxx,size:xx}]}
|
// {groups:[{name:xxx,size:xx}]}
|
||||||
arr := make([]map[string]interface{}, 0)
|
arr := make([]map[string]interface{}, 0)
|
||||||
|
|
||||||
@@ -480,7 +490,7 @@ func Info() map[string]interface{} {
|
|||||||
|
|
||||||
// conns
|
// conns
|
||||||
conns := make([]interface{}, 0)
|
conns := make([]interface{}, 0)
|
||||||
for _, c := range Hub.conns {
|
for _, c := range Bus.conns {
|
||||||
m := make(map[string]interface{}, 0)
|
m := make(map[string]interface{}, 0)
|
||||||
m["remoteaddr"] = (*c.conn).RemoteAddr()
|
m["remoteaddr"] = (*c.conn).RemoteAddr()
|
||||||
m["groupid"] = c.groupid
|
m["groupid"] = c.groupid
|
||||||
@@ -493,14 +503,14 @@ func Info() map[string]interface{} {
|
|||||||
info := map[string]interface{}{
|
info := map[string]interface{}{
|
||||||
"topics": topics,
|
"topics": topics,
|
||||||
"topicsize": len(topics),
|
"topicsize": len(topics),
|
||||||
"timersize": len(Hub.timers),
|
"timersize": len(Bus.timers),
|
||||||
"conns": conns,
|
"conns": conns,
|
||||||
"connsize": len(Hub.conns),
|
"connsize": len(Bus.conns),
|
||||||
}
|
}
|
||||||
return info
|
return info
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) Clearup() {
|
func (s *ZBus) Clearup() {
|
||||||
for tn, topic := range s.topics {
|
for tn, topic := range s.topics {
|
||||||
for _, group := range topic.groups {
|
for _, group := range topic.groups {
|
||||||
if len(group.conns) > 0 || topic.mcount > group.offset {
|
if len(group.conns) > 0 || topic.mcount > group.offset {
|
||||||
@@ -513,7 +523,7 @@ func (s *ZSub) Clearup() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) noSubscribe(topic string) bool {
|
func (s *ZBus) noSubscribe(topic string) bool {
|
||||||
zTopic := s.topics[topic]
|
zTopic := s.topics[topic]
|
||||||
if zTopic == nil || len(zTopic.groups) == 0 {
|
if zTopic == nil || len(zTopic.groups) == 0 {
|
||||||
return true
|
return true
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
@@ -34,7 +34,7 @@ func Append(str string, fileName string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) SaveData() {
|
func (s *ZBus) SaveData() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Println("SaveData Recovered:", r)
|
log.Println("SaveData Recovered:", r)
|
||||||
@@ -89,12 +89,12 @@ func (s *ZSub) SaveData() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) LoadData() {
|
func (s *ZBus) LoadData() {
|
||||||
s.loadDelay()
|
s.loadDelay()
|
||||||
// s.loadLock()
|
// s.loadLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) loadDelay() {
|
func (s *ZBus) loadDelay() {
|
||||||
f, err := os.Open(datadir + "/delay.z")
|
f, err := os.Open(datadir + "/delay.z")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@@ -129,7 +129,7 @@ func (s *ZSub) loadDelay() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) loadLock() {
|
func (s *ZBus) loadLock() {
|
||||||
f, err := os.Open(datadir + "/lock.z")
|
f, err := os.Open(datadir + "/lock.z")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@@ -161,6 +161,7 @@ func (s *ZSub) loadLock() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s._lock(&Lock{
|
s._lock(&Lock{
|
||||||
|
cmd: "lock",
|
||||||
key: split[0],
|
key: split[0],
|
||||||
uuid: split[1],
|
uuid: split[1],
|
||||||
duration: duration,
|
duration: duration,
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
@@ -1,9 +1,10 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
|
_ "github.com/lib/pq" // 导入 pq 驱动
|
||||||
"github.com/robfig/cron"
|
"github.com/robfig/cron"
|
||||||
"log"
|
"log"
|
||||||
"regexp"
|
"regexp"
|
||||||
@@ -29,7 +30,7 @@ type ZDelay struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delay : delay topic value 100 -> publish topic value
|
// Delay : delay topic value 100 -> publish topic value
|
||||||
func (s *ZSub) Delay(rcmd []string) {
|
func (s *ZBus) Delay(rcmd []string) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer func() {
|
defer func() {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
@@ -70,7 +71,7 @@ func (s *ZSub) Delay(rcmd []string) {
|
|||||||
select {
|
select {
|
||||||
case <-delay.Timer.C:
|
case <-delay.Timer.C:
|
||||||
log.Println("delay send:", rcmd[1], rcmd[2])
|
log.Println("delay send:", rcmd[1], rcmd[2])
|
||||||
Hub.Publish(rcmd[1], rcmd[2])
|
Bus.Publish(rcmd[1], rcmd[2])
|
||||||
funChan <- func() {
|
funChan <- func() {
|
||||||
delete(s.delays, rcmd[1]+"-"+rcmd[2])
|
delete(s.delays, rcmd[1]+"-"+rcmd[2])
|
||||||
}
|
}
|
||||||
@@ -82,7 +83,7 @@ func (s *ZSub) Delay(rcmd []string) {
|
|||||||
/*
|
/*
|
||||||
["Timer", Topic, expr, 0|1]
|
["Timer", Topic, expr, 0|1]
|
||||||
*/
|
*/
|
||||||
func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
func (s *ZBus) timer(rcmd []string, c *ZConn) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
timer := s.timers[rcmd[1]]
|
timer := s.timers[rcmd[1]]
|
||||||
@@ -108,8 +109,8 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
|||||||
|
|
||||||
var timerFun = func() {
|
var timerFun = func() {
|
||||||
for _, conn := range timer.Conns {
|
for _, conn := range timer.Conns {
|
||||||
log.Println("Timer send:", timer.Topic)
|
log.Println("timer send:", timer.Topic)
|
||||||
err := conn.send("Timer", timer.Topic)
|
err := conn.send("timer", timer.Topic)
|
||||||
if timer.Single && err == nil {
|
if timer.Single && err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -155,20 +156,45 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ZSub) ReloadTimer() {
|
func (s *ZBus) ReloadTimer() {
|
||||||
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
|
// 未配置 ztimer 数据库返回
|
||||||
Conf.Ztimer.Db.User,
|
if Conf.Ztimer.Db.Addr == "" {
|
||||||
Conf.Ztimer.Db.Password,
|
log.Println("No found ztimer config in app.ini")
|
||||||
Conf.Ztimer.Db.Addr,
|
return
|
||||||
Conf.Ztimer.Db.Database,
|
}
|
||||||
))
|
|
||||||
|
var db *sql.DB
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if Conf.Ztimer.Db.Type == "postgres" {
|
||||||
|
hostPort := strings.Split(Conf.Ztimer.Db.Addr, ":")
|
||||||
|
db, err = sql.Open("postgres", fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=disable",
|
||||||
|
Conf.Ztimer.Db.User,
|
||||||
|
Conf.Ztimer.Db.Password,
|
||||||
|
hostPort[0],
|
||||||
|
hostPort[1],
|
||||||
|
Conf.Ztimer.Db.Database,
|
||||||
|
))
|
||||||
|
// 设置当前会话的 schema
|
||||||
|
_, err = db.Exec("SET search_path TO " + Conf.Ztimer.Db.Schema)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
|
||||||
|
Conf.Ztimer.Db.User,
|
||||||
|
Conf.Ztimer.Db.Password,
|
||||||
|
Conf.Ztimer.Db.Addr,
|
||||||
|
Conf.Ztimer.Db.Database,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
rows, err := db.Query("SELECT t.`name`, IF(t.`status`=10,t.`expr`,''), IF(t.`single`=1,'a','x') 'single' FROM tasktimer t ORDER BY t.`timerid`")
|
rows, err := db.Query("SELECT t.`name`, IF(t.`status`=10,t.`expr`,''), IF(t.`single`=1,'a','x') single FROM tasktimer t ORDER BY t.`timerid`")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return
|
return
|
||||||
@@ -179,6 +205,6 @@ func (s *ZSub) ReloadTimer() {
|
|||||||
var expr string
|
var expr string
|
||||||
var single string
|
var single string
|
||||||
rows.Scan(&name, &expr, &single)
|
rows.Scan(&name, &expr, &single)
|
||||||
s.timer([]string{"Timer", name, expr, single}, nil) //["Timer", Topic, expr, a|x]
|
s.timer([]string{"timer", name, expr, single}, nil) //["timer", Topic, expr, a|x]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package zsub
|
package zbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
51
main.go
51
main.go
@@ -2,23 +2,39 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"zhub/cmd"
|
"os"
|
||||||
"zhub/internal/config"
|
|
||||||
"zhub/internal/monitor"
|
"gitea.1216.top/lxy/zhub/cmd"
|
||||||
"zhub/internal/zsub"
|
"gitea.1216.top/lxy/zhub/internal/config"
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/monitor"
|
||||||
|
"gitea.1216.top/lxy/zhub/internal/zbus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var isCliMode bool // 是否以客户端模式运行的标志
|
// 命令查询版本号
|
||||||
var rcmd string // 客户端模式下运行的命令
|
versionFlag := flag.Bool("version", false, "Display the version")
|
||||||
flag.BoolVar(&isCliMode, "cli", false, "run as client mode") // 定义 cli 参数
|
vFlag := flag.Bool("v", false, "Display the version")
|
||||||
flag.StringVar(&rcmd, "r", "", "run as client mode") // 定义 r 参数
|
VFlag := flag.Bool("V", false, "Display the version")
|
||||||
flag.Parse() // 解析命令行参数
|
|
||||||
|
isCliMode := flag.Bool("cli", false, "Run as client mode") // 客户端模式参数
|
||||||
|
rcmd := flag.String("r", "", "Run command in client mode") // 客户端命令参数
|
||||||
|
|
||||||
|
// 解析命令行参数
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// 检查是否有版本参数, 如果有则输出版本号并退出
|
||||||
|
if *versionFlag || *vFlag || *VFlag {
|
||||||
|
fmt.Printf("Version: %s\n", monitor.Version)
|
||||||
|
os.Exit(0) // 输出后退出
|
||||||
|
}
|
||||||
|
|
||||||
conf := config.ReadConfig() // 读取配置文件
|
conf := config.ReadConfig() // 读取配置文件
|
||||||
addr := conf.Service.Addr // 获取服务地址
|
addr := conf.Service.Addr // 获取服务地址
|
||||||
config.InitLog(conf.Log) // 初始化日志配置
|
config.InitLog(conf.Log) // 初始化日志配置
|
||||||
|
// 输出版本号
|
||||||
|
log.Println("ZHub version:", monitor.Version)
|
||||||
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@@ -29,20 +45,23 @@ func main() {
|
|||||||
}*/
|
}*/
|
||||||
}
|
}
|
||||||
|
|
||||||
if rcmd != "" { // 如果指定了客户端命令
|
if *rcmd != "" { // 如果指定了客户端命令
|
||||||
adminToken, err := zsub.AuthManager.AdminToken() // 认证信息
|
adminToken, err := zbus.AuthManager.AdminToken() // 认证信息
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err) // Configuration error, stop the client from running.
|
log.Fatal(err) // Configuration error, stop the client from running.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接
|
cli := cmd.ZHubClient{}
|
||||||
|
err = cli.Initx("server-local", addr, "server-admin", adminToken)
|
||||||
|
|
||||||
|
// cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err) // 如果连接失败则打印错误信息
|
log.Println(err) // 如果连接失败则打印错误信息
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer cli.Close() // 延迟关闭客户端连接
|
defer cli.Close() // 延迟关闭客户端连接
|
||||||
switch rcmd {
|
switch *rcmd {
|
||||||
case "timer":
|
case "timer":
|
||||||
cli.Cmd("reload-timer")
|
cli.Cmd("reload-timer")
|
||||||
case "shutdown", "stop":
|
case "shutdown", "stop":
|
||||||
@@ -50,10 +69,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if isCliMode {
|
if *isCliMode {
|
||||||
cmd.ClientRun(addr) // 客户端运行
|
//cmd.ClientRun(addr) // 客户端运行
|
||||||
} else {
|
} else {
|
||||||
go monitor.StartWatch() // 启动监控协程
|
go monitor.StartWatch() // 启动监控协程
|
||||||
zsub.StartServer(addr, conf) // 启动服务进程
|
zbus.StartServer(addr, conf) // 启动服务进程
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user