Compare commits

13 Commits

19 changed files with 785 additions and 611 deletions

View File

@@ -1,38 +1,11 @@
# 阶段一:构建阶段 # 构建运行阶段
FROM golang:alpine AS builder
WORKDIR /opt/zhub
# 复制 go.mod 文件
COPY go.mod .
# 下载并缓存依赖包
RUN go mod download
# 将源代码复制到容器中
COPY . .
# 运行 go get 命令以确保所有需要的包都被获取和更新
#RUN go get zhub/internal/monitor
#RUN go get zhub/cmd
#RUN go get zhub/internal/zsub
#RUN go get zhub/internal/config
# 构建可执行文件
RUN go build -o zhub.sh -ldflags "-s -w"
# 阶段二:运行阶段
FROM alpine:latest FROM alpine:latest
# 设置工作目录 # 设置工作目录
WORKDIR /opt/zhub WORKDIR /opt/zhub
# 从构建阶段复制可执行文件到当前阶段 # 从构建阶段复制可执行文件和配置到当前阶段
COPY --from=builder /opt/zhub/zhub.sh . COPY zhub.sh .
COPY --from=builder /opt/zhub/app.ini .
# 复制 app.ini 配置文件到容器中
COPY app.ini . COPY app.ini .
EXPOSE 711 1216 EXPOSE 711 1216

203
README.md Normal file
View File

@@ -0,0 +1,203 @@
# ZHub 工程项目文档
## 1. 概述
ZHub 是一个基于 Go 的高性能分布式消息中间件,支持发布/订阅模式、定时任务、延时消息、分布式锁等功能。它适用于需要高并发、低延迟的消息传递场景。
---
## 2. 技术栈
- **语言**: Go (Golang)
- **网络库**: `net`, `bufio`
- **并发模型**: 使用 goroutines 和 channels 实现并发处理。
- **持久化**: 使用文件存储(如延时任务和锁信息)。
- **配置管理**: 使用 Viper 读取 INI 格式的配置文件。
- **权限控制**: 基于 YAML 的用户、组、Token 和频道权限管理。
- **监控界面**: 使用 Gin 提供 HTTP 接口用于监控和服务管理。
- **数据库支持**: 支持 MySQL 和 PostgreSQL 用于定时任务的配置。
---
## 3. 核心功能模块
### 3.1 消息总线 (`zbus`)
- **主题管理**: 支持多主题订阅与发布。
- **组订阅**: 每个主题可以有多个消费组,组内客户端共享消息。
- **延时消息**: 支持延时发送消息,使用 `time.Timer` 实现。
- **定时任务**: 支持 cron 表达式或固定时间间隔的任务调度。
- **分布式锁**: 提供互斥锁机制,支持重入和自动释放。
- **持久化**: 使用文件保存延时任务和锁的状态,确保服务重启后数据不丢失。
### 3.2 权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37))
- **用户认证**: 支持基于 Token 的认证。
- **角色授权**: 用户分属不同组,组内定义读写权限。
- **频道访问控制**: 频道可设置为公开或私有,私有频道需授权访问。
- **配置热加载**: 支持运行时重新加载权限配置。
### 3.3 客户端 (`client`)
- **连接管理**: 支持自动重连机制。
- **订阅与发布**: 提供 API 订阅主题并接收消息。
- **RPC 调用**: 支持远程过程调用,并处理超时和返回结果。
- **锁机制**: 支持获取和释放分布式锁。
### 3.4 监控 (`monitor`)
- **HTTP 接口**: 提供 `/_/info`, `/_/cleanup`, `/timer/reload` 等接口。
- **可视化界面**: 提供简单的 HTML 页面展示系统状态。
- **版本信息**: 显示当前运行的 ZHub 版本。
---
## 4. 架构设计
### 4.1 总体架构
ZHub 采用经典的 C/S 架构,包含以下主要组件:
- **服务端 ([main.go](file://D:\wk-go\zhub\main.go))**
- 启动 TCP 服务器监听客户端连接。
- 加载配置文件并初始化日志。
- 启动监控服务Gin
- **客户端 ([client.go](file://D:\wk-go\zhub\cmd\client.go))**
- 提供连接、订阅、发布、RPC 调用等 API。
- 自动重连机制保证连接稳定性。
- **消息总线 (`zbus`)**
- 处理消息的发布、订阅、延时、定时等核心逻辑。
- **权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37))**
- 管理用户、组、Token 和频道权限。
- **监控 (`monitor`)**
- 提供 HTTP 接口和 Web 界面用于管理和监控。
### 4.2 数据流图
```plaintext
+-------------------+ +------------------+ +------------------+
| Client (Go) |<--->| ZHub Server |<--->| Monitor (Gin) |
+-------------------+ +------------------+ +------------------+
|
v
+------------------+
| Persistence |
| (File, DB) |
+------------------+
```
---
## 5. 配置说明
### 5.1 配置文件 ([app.ini](file://D:\wk-go\zhub\app.ini))
- **[service]**: 服务相关配置。
- `watch`: 监控服务地址。
- [addr](file://D:\wk-go\zhub\cmd\client.go#L25-L25): 主服务监听地址。
- [auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37): 是否启用认证0 不启用1 启用)。
- **[data]**: 数据目录配置。
- **[log]**: 日志配置。
- `handlers`: 输出方式console/file
- `level`: 日志级别info/debug/error
- [file](file://D:\wk-go\zhub\Dockerfile): 日志文件路径。
- **[ztimer]**: 定时任务数据库配置。
- `db.addr`, `db.user`, `db.password`, `db.database`, `db.schema`, `db.type`.
### 5.2 权限配置 ([auth.yml](file://D:\wk-go\zhub\auth.yml))
- **users**: 用户列表,包含 ID、用户名、密码、状态、所属组、读写权限。
- **groups**: 用户组列表,包含名称、描述、读写权限。
- **tokens**: Token 列表,包含 ID、用户 ID、Token 值、过期时间和状态。
- **channels**: 频道列表,包含名称、描述、是否为公开频道。
---
## 6. 部署与构建
### 6.1 构建脚本 ([build.bat](file://D:\wk-go\zhub\build.bat))
- 支持跨平台编译Linux、Windows、Mac
- 使用 UPX 压缩生成的二进制文件。
- 自动生成版本号并嵌入到程序中。
### 6.2 启动命令
- **服务端启动**:
```bash
go run main.go
```
- **客户端启动**:
```bash
go run cmd/client.go -cli -r <command>
```
### 6.3 监控服务
- 默认监听地址:`http://<host>:<port>/`
- 支持的接口:
- `/_/info`: 获取系统信息。
- `/_/cleanup`: 清理无订阅的主题。
- `/timer/reload`: 重新加载定时任务。
- `/topic/publish`: 发布主题消息。
- `/topic/delay`: 发送延时消息。
- `/auth/reload`: 重新加载权限配置。
---
## 7. 安全性设计
### 7.1 认证机制
- 所有客户端必须通过 Token 认证才能进行操作。
- Token 具有过期时间和状态管理。
### 7.2 授权机制
- 用户分属不同组,组内定义读写权限。
- 频道可设置为公开或私有,私有频道需授权访问。
- 支持正则表达式匹配主题名,实现细粒度的权限控制。
### 7.3 分布式锁
- 使用互斥锁机制防止多个客户端同时执行关键操作。
- 锁具有超时机制,避免死锁。
---
## 8. 性能优化
### 8.1 并发处理
- 使用 goroutines 和 channels 实现高效的并发处理。
- 所有 I/O 操作异步化,减少阻塞。
### 8.2 内存管理
- 使用缓冲区池化技术减少内存分配。
- 对频繁使用的结构体进行复用。
### 8.3 日志优化
- 支持日志级别控制,减少不必要的日志输出。
- 日志输出格式化,便于调试和分析。
---
## 9. 测试与验证
### 9.1 单元测试
- 覆盖所有核心模块的功能测试。
- 使用 Go 的 testing 包编写测试用例。
### 9.2 集成测试
- 模拟多个客户端并发操作,验证系统的稳定性和一致性。
- 测试定时任务、延时消息、锁机制等复杂场景。
### 9.3 压力测试
- 使用基准测试工具评估系统的吞吐量和响应时间。
- 优化瓶颈部分,提升整体性能。
---
## 10. 未来规划
### 10.1 功能增强
- 支持更多类型的数据库(如 MongoDB、Redis
- 增加消息持久化支持(如 Kafka、RabbitMQ
- 提供更丰富的监控指标和告警机制。
### 10.2 性能优化
- 引入零拷贝技术减少内存复制。
- 进一步优化 goroutine 调度策略。
### 10.3 安全加固
- 增加 TLS 加密通信支持。
- 支持更复杂的权限策略(如 RBAC、ABAC
### 10.4 用户体验
- 开发图形化管理界面,简化配置和监控操作。
- 提供 SDK 支持多种编程语言(如 Python、Java

31
app.ini
View File

@@ -1,18 +1,21 @@
[log] # app.ini
handlers=console #console|file
level=debug # info|debug|error
file=zhub.log
[service] [service]
watch=0.0.0.0:711 watch=0.0.0.0:711 # 服务管理端口
addr=0.0.0.0:1216 addr=0.0.0.0:1216 # 服务端口
auth=1 auth=0 # 是否开启连接授权 0不开启、1开启
[data] [data]
dir=./data dir=./data # 数据目录
[ztimer] [log]
db.addr=127.0.0.1:3306 handlers=console # console|file
db.user=root level=debug # info|debug|error
db.password=123456 file=zhub.log
db.database=zhub
[ztimer] # ztimer 配置 (可选,如果不使用定时调度则可不配置)
# db.addr=127.0.0.1:3306 # timer 使用的MySql数据库配置
# db.user=root
# db.password=123456
# db.database=zhub
# db.schema=public
# db.type=postgres # mysql|postgres

View File

@@ -36,8 +36,10 @@ groups:
description: Group 1 description: Group 1
reads: reads:
- ^zcore:* # "zcore:" 开头的订阅 - ^zcore:* # "zcore:" 开头的订阅
- rpc-t
writes: writes:
- ^zcore:* # "zcore:" 开头的发送 - ^zcore:* # "zcore:" 开头的发送
- rpc-t
- name: zcore - name: zcore
description: Group 2 description: Group 2
@@ -59,15 +61,16 @@ tokens:
# 公开频道设置 # 公开频道设置
channels: channels:
- name: "-"
description: "无效占位符"
public: true
- name: "lock" - name: "lock"
description: "分布式锁通知频道" description: "分布式锁通知频道"
public: true public: true
- name: "trylock"
description: "分布式锁通知频道"
public: true
- name: "app_local" - name: "app_local"
description: "本地appname" description: "本地appname"
public: true public: true
- name: "DEV-LOCAL"
description: "本地appname"
public: true
# --------------------------------------------- # ---------------------------------------------

118
build.bat Normal file
View File

@@ -0,0 +1,118 @@
@echo off
chcp 65001 >nul
title zhub 构建脚本
rem 获取当前日期和时间并格式化为 YYYY.MM.DD-HH.MM.SS
for /f "tokens=2 delims==" %%i in ('wmic os get localdatetime /value') do set datetime=%%i
set year=%datetime:~0,4%
set month=%datetime:~4,2%
set day=%datetime:~6,2%
set hour=%datetime:~8,2%
set minute=%datetime:~10,2%
set second=%datetime:~12,2%
set version=%year%.%month%.%day%-%hour%.%minute%.%second%
rem 输出当前版本号
echo 当前构建版本号:%version%
rem 删除旧文件
echo 删除历史构建文件...
del /q zhub zhub.sh zhub.exe >nul 2>nul
rem 交互式平台选择
echo ==============================
echo 请选择要构建的平台:
echo ==============================
echo 1. Linux (amd64)
echo 2. Linux (arm64)
echo 3. Windows (amd64)
echo 4. MacOS (amd64)
echo 5. 全部构建
echo 0. 退出
echo ==============================
set /p choice=请输入选项编号(可多选,例如 135:
if "%choice%"=="0" goto end
if not "%choice%"=="" (
echo 开始构建版本 %version%...
)
if "%choice%"=="1" (
call :build_linux
)
if "%choice%"=="2" (
call :build_linux_arm
)
if "%choice%"=="3" (
call :build_windows
)
if "%choice%"=="4" (
call :build_mac
)
if "%choice%"=="5" (
call :build_linux
call :build_linux_arm
call :build_windows
call :build_mac
)
goto end
rem ========= 构建平台函数 ==========
:build_linux
echo [Linux amd64] 开始构建...
set GOOS=linux
set GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
upx -9 zhub.sh
call :move_to_dist zhub.sh linux
goto :eof
:build_linux_arm
echo [Linux arm64] 开始构建...
set GOOS=linux
set GOARCH=arm64
go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
upx -9 zhub.sh
call :move_to_dist zhub.sh linux-arm
goto :eof
:build_windows
echo [Windows amd64] 开始构建...
set GOOS=windows
set GOARCH=amd64
go build -o zhub.exe -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
upx -9 zhub.exe
call :move_to_dist zhub.exe windows
goto :eof
:build_mac
echo [MacOS amd64] 开始构建...
set GOOS=darwin
set GOARCH=amd64
go build -o zhub -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'"
upx -9 zhub
call :move_to_dist zhub mac
goto :eof
rem ========= 移动构建文件 ==========
:move_to_dist
setlocal
set file=%1
set platform=%2
set target=dist\%version%\%platform%
if not exist %target% (
mkdir %target%
)
move /Y %file% %target%\ >nul
echo 构建产物已移动至:%target%\%file%
endlocal
goto :eof
:end
echo 构建流程结束。
pause
exit

View File

@@ -3,22 +3,20 @@ package cmd
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt"
"github.com/go-basic/uuid"
"io" "io"
"unicode/utf8" "unicode/utf8"
//"github.com/go-basic/uuid"
"log" "log"
"net" "net"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/go-basic/uuid"
) )
type Client struct { type ZHubClient struct {
wlock sync.Mutex // write lock wlock sync.Mutex // write lock
rlock sync.Mutex // read lock rlock sync.Mutex // read lock
@@ -46,37 +44,40 @@ type Lock struct {
// duration int // lock duration // duration int // lock duration
} }
func Create(appname, addr, groupid, auth string) (*Client, error) { func (c *ZHubClient) Initx(appname, addr, groupid, auth string) error {
conn, err := net.Dial("tcp", addr) c.appname = appname
if err != nil { c.addr = addr
return &Client{}, err c.groupid = groupid
} c.auth = auth
return c.Init()
client := Client{
wlock: sync.Mutex{},
rlock: sync.Mutex{},
appname: appname,
addr: addr,
conn: conn,
groupid: groupid,
createTime: time.Now(),
subFun: make(map[string]func(v string)),
timerFun: make(map[string]func()),
chSend: make(chan []string, 100),
chReceive: make(chan []string, 100),
timerReceive: make(chan []string, 100),
lockFlag: make(map[string]*Lock),
auth: auth,
}
client.send("auth", auth)
client.send("groupid " + groupid)
client.init()
return &client, err
} }
func (c *Client) reconn() (err error) { // Init 创建一个客户端
func (c *ZHubClient) Init( /*appname, addr, groupid, auth string*/ ) error {
conn, err := net.Dial("tcp", c.addr)
if err != nil {
return err
}
c.conn = conn
c.wlock = sync.Mutex{}
c.rlock = sync.Mutex{}
c.createTime = time.Now()
c.subFun = make(map[string]func(v string))
c.timerFun = make(map[string]func())
c.chSend = make(chan []string, 100)
c.chReceive = make(chan []string, 100)
c.timerReceive = make(chan []string, 100)
c.lockFlag = make(map[string]*Lock)
c.send("auth", c.auth)
c.send("groupid " + c.groupid)
c.init()
return err
}
func (c *ZHubClient) reconn() (err error) {
for n := 1; n < 10; n++ { for n := 1; n < 10; n++ {
conn, err := net.Dial("tcp", c.addr) conn, err := net.Dial("tcp", c.addr)
if err != nil { if err != nil {
@@ -102,7 +103,7 @@ func (c *Client) reconn() (err error) {
return return
} }
func (c *Client) init() { func (c *ZHubClient) init() {
// 消费 topic 消息 // 消费 topic 消息
go func() { go func() {
for { for {
@@ -130,7 +131,7 @@ func (c *Client) init() {
} }
// Subscribe subscribe topic // Subscribe subscribe topic
func (c *Client) Subscribe(topic string, fun func(v string)) { func (c *ZHubClient) Subscribe(topic string, fun func(v string)) {
c.send("subscribe " + topic) c.send("subscribe " + topic)
if fun != nil { if fun != nil {
c.wlock.Lock() c.wlock.Lock()
@@ -139,7 +140,7 @@ func (c *Client) Subscribe(topic string, fun func(v string)) {
} }
} }
func (c *Client) Unsubscribe(topic string) { func (c *ZHubClient) Unsubscribe(topic string) {
c.send("unsubscribe " + topic) c.send("unsubscribe " + topic)
delete(c.subFun, topic) delete(c.subFun, topic)
} }
@@ -149,30 +150,32 @@ func (c *Client) Unsubscribe(topic string) {
ping ping
--- ---
*/ */
func (c *Client) ping() { func (c *ZHubClient) ping() {
c.send("ping") c.send("ping")
} }
//Publish -------------------------------------- pub-sub -------------------------------------- // Publish -------------------------------------- pub-sub --------------------------------------
func (c *Client) Publish(topic string, message string) error { func (c *ZHubClient) Publish(topic string, message string) error {
return c.send("publish", topic, message) return c.send("publish", topic, message)
} }
func (c *Client) Broadcast(topic string, message string) error { func (c *ZHubClient) Broadcast(topic string, message string) error {
return c.send("broadcast", topic, message) return c.send("broadcast", topic, message)
} }
func (c *Client) Delay(topic string, message string, delay int) error { func (c *ZHubClient) Delay(topic string, message string, delay int) error {
return c.send("delay", topic, message, strconv.Itoa(delay)) return c.send("delay", topic, message, strconv.Itoa(delay))
} }
/* /*
Timer Timer
func (c *Client) Timer(topic string, expr string, fun func()) {
c.timerFun[topic] = fun func (c *ZHubClient) Timer(topic string, expr string, fun func()) {
c.send("timer", topic, expr, "x") c.timerFun[topic] = fun
}*/ c.send("timer", topic, expr, "x")
func (c *Client) Timer(topic string, fun func()) { }
*/
func (c *ZHubClient) Timer(topic string, fun func()) {
if fun != nil { if fun != nil {
c.timerFun[topic] = fun c.timerFun[topic] = fun
} }
@@ -180,7 +183,7 @@ func (c *Client) Timer(topic string, fun func()) {
} }
// Cmd send cmd // Cmd send cmd
func (c *Client) Cmd(cmd ...string) { func (c *ZHubClient) Cmd(cmd ...string) {
if len(cmd) == 1 { if len(cmd) == 1 {
c.send("cmd", cmd[0]) c.send("cmd", cmd[0])
} else if len(cmd) > 1 { } else if len(cmd) > 1 {
@@ -191,35 +194,44 @@ func (c *Client) Cmd(cmd ...string) {
} }
} }
func (c *Client) Close() { func (c *ZHubClient) Close() {
c.conn.Close() c.conn.Close()
} }
// TryLock
func TryLock(key string, duration int) {
/*uuid := uuid.New()
TODO
return Lock{Key: key, Value: uuid}*/
}
// Lock Key // Lock Key
func (c *Client) Lock(key string, duration int) Lock { func (c *ZHubClient) Lock(key string, duration int) Lock {
v := uuid.New() uuid := uuid.New()
c.send("v", key, v, strconv.Itoa(duration)) c.send("uuid", key, uuid, strconv.Itoa(duration))
lockChan := make(chan int, 2) lockChan := make(chan int, 2)
go func() { go func() {
c.wlock.Lock() c.wlock.Lock()
defer c.wlock.Unlock() defer c.wlock.Unlock()
c.lockFlag[v] = &Lock{ c.lockFlag[uuid] = &Lock{
Key: key, Key: key,
Value: v, Value: uuid,
flagChan: lockChan, flagChan: lockChan,
} }
}() }()
select { select {
case <-lockChan: case <-lockChan:
log.Println("v-ok", time.Now().UnixNano()/1e6, v) log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid)
} }
return Lock{Key: key, Value: v} return Lock{Key: key, Value: uuid}
} }
func (c *Client) Unlock(l Lock) { func (c *ZHubClient) Unlock(l Lock) {
c.send("unlock", l.Key, l.Value) c.send("unlock", l.Key, l.Value)
delete(c.lockFlag, l.Value) delete(c.lockFlag, l.Value)
} }
@@ -228,7 +240,7 @@ func (c *Client) Unlock(l Lock) {
var rpcMap = make(map[string]*Rpc) var rpcMap = make(map[string]*Rpc)
var rpcLock = sync.RWMutex{} var rpcLock = sync.RWMutex{}
func (c *Client) rpcInit() { func (c *ZHubClient) rpcInit() {
// 添加 appname 主题订阅处理 // 添加 appname 主题订阅处理
c.Subscribe(c.appname, func(v string) { c.Subscribe(c.appname, func(v string) {
@@ -275,7 +287,11 @@ func (r Rpc) backTopic() string {
return strings.Split(r.Ruk, "::")[0] return strings.Split(r.Ruk, "::")[0]
} }
func (c Client) Rpc(topic string, message string, back func(res RpcResult)) { func (c *ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) {
c.RpcWithTimeout(topic, message, time.Second*15, back)
}
func (c *ZHubClient) RpcWithTimeout(topic string, message string, timeout time.Duration, back func(res RpcResult)) {
rpc := Rpc{ rpc := Rpc{
Ruk: c.appname + "::" + uuid.New(), Ruk: c.appname + "::" + uuid.New(),
Topic: topic, Topic: topic,
@@ -296,7 +312,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
select { select {
case <-rpc.Ch: case <-rpc.Ch:
// ch 事件rpc 返回) // ch 事件rpc 返回)
case <-time.After(time.Second * 15): case <-time.After(timeout):
// rpc 超时 // rpc 超时
x, _ := json.Marshal(rpc) x, _ := json.Marshal(rpc)
log.Println("rpc timeout:", x) log.Println("rpc timeout:", x)
@@ -309,7 +325,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
} }
// RpcSubscribe rpc subscribe // RpcSubscribe rpc subscribe
func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { func (c *ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
c.Subscribe(topic, func(v string) { c.Subscribe(topic, func(v string) {
rpc := Rpc{} rpc := Rpc{}
err := json.Unmarshal([]byte(v), &rpc) err := json.Unmarshal([]byte(v), &rpc)
@@ -322,7 +338,7 @@ func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
result.Ruk = rpc.Ruk result.Ruk = rpc.Ruk
res, _ := json.Marshal(result) res, _ := json.Marshal(result)
c.Publish(rpc.backTopic(), string(res[:])) c.Publish(rpc.backTopic(), string(res))
}) })
} }
@@ -333,7 +349,7 @@ send socket message :
if len(vs) equal 1 will send message `vs[0] + "\r\n"` if len(vs) equal 1 will send message `vs[0] + "\r\n"`
else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...` else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...`
*/ */
func (c *Client) send(vs ...string) (err error) { func (c *ZHubClient) send(vs ...string) (err error) {
//chSend <- vs //chSend <- vs
c.wlock.Lock() c.wlock.Lock()
defer c.wlock.Unlock() defer c.wlock.Unlock()
@@ -351,13 +367,19 @@ a:
if err != nil { if err != nil {
log.Println(err) log.Println(err)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
// check conn reconnect
{
c.wlock.Unlock()
c.reconn()
c.wlock.Lock()
}
goto a goto a
} }
return err return err
} }
func (c *Client) receive() { func (c *ZHubClient) receive() {
r := bufio.NewReader(c.conn) r := bufio.NewReader(c.conn)
for { for {
v, _, err := r.ReadLine() v, _, err := r.ReadLine()
@@ -373,6 +395,7 @@ func (c *Client) receive() {
if string(v) == "+ping" { if string(v) == "+ping" {
c.send("+pong") c.send("+pong")
} }
log.Println("receive:", string(v))
case '-': case '-':
log.Println("error:", string(v)) log.Println("error:", string(v))
case '*': case '*':
@@ -384,7 +407,8 @@ func (c *Client) receive() {
continue continue
} }
var vs []string var vs []string
for i := 0; i < n; i++ { //for i := 0; i < n; i++ {
for len(vs) < n {
line, _, err := r.ReadLine() line, _, err := r.ReadLine()
if err != nil || line == nil { if err != nil || line == nil {
continue continue
@@ -404,16 +428,20 @@ func (c *Client) receive() {
} }
vs = append(vs, string(buf)) vs = append(vs, string(buf))
} }
if len(vs) == 3 && vs[0] == "message" && vs[1] == "lock" { if len(vs) == 3 && vs[0] == "message" {
go func() { if vs[1] == "lock" {
log.Println("lock:" + vs[2]) go func() {
c.wlock.Lock() log.Println("lock:" + vs[2])
defer c.wlock.Unlock() c.wlock.Lock()
if c.lockFlag[vs[2]] == nil { defer c.wlock.Unlock()
return if c.lockFlag[vs[2]] == nil {
} return
c.lockFlag[vs[2]].flagChan <- 0 }
}() c.lockFlag[vs[2]].flagChan <- 0
}()
} else {
c.chReceive <- vs
}
continue continue
} }
if len(vs) == 2 && vs[0] == "timer" { if len(vs) == 2 && vs[0] == "timer" {
@@ -423,118 +451,3 @@ func (c *Client) receive() {
} }
} }
} }
// -------------------------------------- hm --------------------------------------
// ==============================================================================
var reconnect = 0
// ClientRun client 命令行程序
func ClientRun(addr string) {
conn, err := net.Dial("tcp", fmt.Sprintf("%s", addr))
for {
if err != nil {
log.Println(err)
time.Sleep(time.Second * 3)
conn, err = net.Dial("tcp", fmt.Sprintf("%s", addr))
continue
}
fmt.Println(fmt.Sprintf("had connected server: %s", addr))
break
}
defer func() {
if reconnect == 1 {
conn.Close()
ClientRun(addr)
}
}()
go clientRead(conn)
for {
inReader := bufio.NewReader(os.Stdin)
line, err := inReader.ReadString('\n')
if err != nil {
fmt.Println(err)
return
} else if reconnect == 1 {
return
}
line = strings.Trim(line, "\r\n")
line = strings.Trim(line, "\n")
line = strings.Trim(line, " ")
if strings.EqualFold(line, "") {
continue
} else if strings.EqualFold(line, ":exit") {
fmt.Println("exit!")
return
}
//fmt.Println("发送数据:" + line)
line = strings.ReplaceAll(line, " ", "")
parr := strings.Split(line, " ")
conn.Write([]byte("*" + strconv.Itoa(len(parr)) + "\r\n"))
for i := range parr {
conn.Write([]byte("$" + strconv.Itoa(len(parr[i])) + "\r\n"))
conn.Write([]byte(parr[i] + "\r\n"))
}
}
}
func clientRead(conn net.Conn) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered:", r)
}
reconnect = 1
}()
reader := bufio.NewReader(conn)
for {
rcmd := make([]string, 0)
line, _, err := reader.ReadLine()
if err != nil {
log.Println("connection error: ", err)
return
} else if len(line) == 0 {
continue
}
switch string(line[:1]) {
case "*":
n, _ := strconv.Atoi(string(line[1:]))
for i := 0; i < n; i++ {
reader.ReadLine()
v, _, _ := reader.ReadLine()
rcmd = append(rcmd, string(v))
}
case "+":
rcmd = append(rcmd, string(line))
case "-":
rcmd = append(rcmd, string(line))
case ":":
rcmd = append(rcmd, string(line))
case "h":
if strings.EqualFold(string(line), "help-start") {
for {
v, _, _ := reader.ReadLine()
if strings.EqualFold(string(v), "help-end") {
break
}
rcmd = append(rcmd, string(v)+"\r\n")
}
}
default:
rcmd = append(rcmd, string(line))
}
fmt.Println(">", strings.Join(rcmd, " "))
}
}

View File

@@ -1,110 +1,45 @@
package cmd package cmd
import ( import (
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"strconv" "strconv"
"strings" "strings"
"sync/atomic" "sync"
"testing" "testing"
"time" "time"
) )
var zhub *Client var hub *ZHubClient
var once = sync.Once{}
var (
addr = "127.0.0.1:1216"
)
func init() { func init() {
client, err := Create("zhub-cli", addr, "C-0", "admin@123456") once.Do(func() {
if err != nil { hub := &ZHubClient{
log.Fatal(err) appname: "hub-cli",
} addr: "127.0.0.1:1216",
zhub = client groupid: "C-0",
} auth: "admin@123456",
}
err := hub.Init()
func newClient(appname, groupid string) *Client { if err != nil {
client, err := Create(appname, addr, groupid, "admin@123456") log.Fatal(err)
if err != nil { }
log.Fatal(err) hub = hub
} })
return client
}
func TestTimer(t *testing.T) {
go func() {
client := newClient("zhub-cli", "g-1")
client.Subscribe("ax1", func(v string) {
log.Println("topic-1-ax: " + v)
})
}()
go func() {
client := newClient("zhub-cli", "g-1")
client.Subscribe("ax1", func(v string) {
log.Println("topic-2-ax: " + v)
})
}()
go func() {
client := newClient("zhub-cli", "g-1")
client.Subscribe("ax1", func(v string) {
log.Println("topic-3-ax: " + v)
})
}()
time.Sleep(time.Hour * 3)
}
func TestSendCmd(t *testing.T) {
client := newClient("zhub-cli", "group-admin")
//client.Cmd("reload-timer")
client.Cmd("shutdown")
}
func TestPublish(t *testing.T) {
/*zhub.Publish("abx", "asd\r\nxxx1")
zhub.Publish("abx", "asd\r\nxxx2")
zhub.Publish("abx", "asd\r\nxxx3")
zhub.Publish("abx", "asd\r\nxxx4")
zhub.Publish("abx", "asd\r\nxxx5")*/
/*for i := 0; i < 10000; i++ {
zhub.Publish("ax1", strconv.Itoa(i))
}*/
/*for i := 0; i < 20_0000; i++ {
time.Sleep(1 * time.Millisecond)
zhub.Publish("b", strconv.Itoa(i))
}*/
/*zhub.Subscribe("wx:user-follow", func(v string) {
fmt.Println(v)
})*/
//hub.Publish("ax", "1")
time.Sleep(time.Second)
} }
func TestLock(t *testing.T) { func TestLock(t *testing.T) {
client, _ := Create("zhub-cli", addr, "xx", "admin@123456") hub.Subscribe("lock", func(v string) {
client.Subscribe("lock", func(v string) {
}) })
var fun = func(x string) { var fun = func(x string) {
log.Println("lock", time.Now().UnixNano()/1e6) log.Println("lock", time.Now().UnixNano()/1e6)
lock := client.Lock("a", 30) lock := hub.Lock("a", 30)
defer client.Unlock(lock) defer hub.Unlock(lock)
//client.Lock("a", 5) //client.Lock("a", 5)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@@ -124,6 +59,8 @@ func rotate(nums []int, k int) {
k = k % len(nums) k = k % len(nums)
nums = append(nums[len(nums)-k:], nums[0:len(nums)-k]...) nums = append(nums[len(nums)-k:], nums[0:len(nums)-k]...)
} }
// 特殊符号测试
func TestName(t *testing.T) { func TestName(t *testing.T) {
//str := ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}" //str := ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}"
/*str := "别人家的女娃子🤞🏻" /*str := "别人家的女娃子🤞🏻"
@@ -150,83 +87,7 @@ func TestName(t *testing.T) {
func toStr(d interface{}) string { func toStr(d interface{}) string {
bs, _ := json.Marshal(d) bs, _ := json.Marshal(d)
return string(bs[:]) return string(bs)
}
// 接收数据 A
func TestC_a(t *testing.T) {
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
if err != nil {
log.Fatal(err)
}
zhub.Subscribe("cmt:user-msg", func(v string) {
fmt.Println(v)
})
time.Sleep(10 * time.Hour)
}
// 接收数据
func TestC_ab(t *testing.T) {
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
if err != nil {
log.Fatal(err)
}
zhub.Subscribe("a", func(v string) {
fmt.Println("a:", v)
})
zhub.Subscribe("b", func(v string) {
fmt.Println("b", v)
})
zhub.Subscribe("im:friend:186", func(v string) {
fmt.Println("im:friend:186", v)
})
time.Sleep(1 * time.Hour)
}
func TestDelay2(t *testing.T) {
zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456")
if err != nil {
log.Fatal(err)
}
var x int64 = 0
go func() {
zhub.Subscribe("a", func(v string) {
fmt.Println(v, "-", atomic.AddInt64(&x, 1))
})
}()
zhub2, err := Create("zhub-cli", addr, "C-1", "admin@123456")
if err != nil {
log.Fatal(err)
}
go func() {
zhub2.Subscribe("a", func(v string) {
fmt.Println(v, "-", atomic.AddInt64(&x, 1))
})
}()
time.Sleep(time.Second * 20000)
}
func TestDelay(t *testing.T) {
//zhub.Delay("abx", "1", -1)
//zhub.Publish("yk-topic", "hello yk.")
for i := 0; i < 1000; i++ {
zhub.Publish("a", "x-"+strconv.Itoa(i))
}
time.Sleep(time.Second * 5)
} }
// 测试发送微信 模板消息 // 测试发送微信 模板消息
@@ -267,12 +128,12 @@ func TestWxSendMessage(t *testing.T) {
log.Println(toStr(tplData["templateData"])) log.Println(toStr(tplData["templateData"]))
zhub.Publish("wx:send-template-message", toStr(tplData)) hub.Publish("wx:send-template-message", toStr(tplData))
} }
// 监听各项目所有接口请求失败信息《接口请求失败,发送失败信息到 pro.app-error 主题》 // 监听各项目所有接口请求失败信息《接口请求失败,发送失败信息到 pro.app-error 主题》
func TestAppErrorConsole(t *testing.T) { func TestAppErrorConsole(t *testing.T) {
zhub.Subscribe("app-error", func(v string) { hub.Subscribe("app-error", func(v string) {
strs := []string{ strs := []string{
"未登陆", "账号已在其他设备登录", "今日已签到", "您的登录状态已过期", "用户不存在", "暂无可预定场馆", "未登陆", "账号已在其他设备登录", "今日已签到", "您的登录状态已过期", "用户不存在", "暂无可预定场馆",
} }
@@ -291,8 +152,7 @@ func TestAppErrorConsole(t *testing.T) {
// ----------- rpc test ----------- // ----------- rpc test -----------
func TestRpcCall_S(t *testing.T) { func TestRpcCall_S(t *testing.T) {
zhub := newClient("zhub-cli-a", "x") hub.RpcSubscribe("ai-test", func(r Rpc) RpcResult {
zhub.RpcSubscribe("ai-test", func(r Rpc) RpcResult {
upper := strings.ToUpper(r.Value) upper := strings.ToUpper(r.Value)
return RpcResult{Retcode: 0, Retinfo: "操作成功", Result: upper} return RpcResult{Retcode: 0, Retinfo: "操作成功", Result: upper}
}) })
@@ -315,7 +175,7 @@ func TestRpcCall_C(t *testing.T) {
print(res.Result) print(res.Result)
})*/ })*/
zhub.Subscribe("zcore:monitor-error", func(v string) { hub.Subscribe("zcore:monitor-error", func(v string) {
fmt.Println(v) fmt.Println(v)
}) })
@@ -325,13 +185,3 @@ func TestRpcCall_C(t *testing.T) {
fmt.Println(res) fmt.Println(res)
})*/ })*/
} }
func TestBannedTalk(t *testing.T) {
/*zhub.Rpc("im:banned-talk", "{'imtoken':'74074f9e599947ca940e71a9788e768f'}", func(res RpcResult) {
fmt.Print(res)
})*/
encoding := base64.Encoding{}
toString := encoding.EncodeToString([]byte("420101190001011234"))
fmt.Println(toString)
}

88
go.mod
View File

@@ -1,55 +1,63 @@
module zhub module gitea.1216.top/lxy/zhub
go 1.20 go 1.25.0
require ( require (
github.com/gin-gonic/gin v1.9.0 github.com/gin-gonic/gin v1.11.0
github.com/go-basic/uuid v1.0.0 github.com/go-basic/uuid v1.0.0
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.9.3
github.com/lib/pq v1.10.9
github.com/robfig/cron v1.2.0 github.com/robfig/cron v1.2.0
github.com/spf13/viper v1.15.0 github.com/spf13/viper v1.21.0
gopkg.in/ini.v1 v1.67.0
) )
require ( require (
github.com/bytedance/sonic v1.9.1 // indirect filippo.io/edwards25519 v1.1.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/bytedance/gopkg v0.1.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/cloudwego/base64x v0.1.6 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.55.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/sagikazarmark/locafero v0.12.0 // indirect
go.uber.org/mock v0.6.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/tools v0.37.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
require (
github.com/bytedance/sonic v1.14.1 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.10 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect github.com/goccy/go-json v0.10.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.2.4 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/spf13/afero v1.9.3 // indirect github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cast v1.10.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.10 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect github.com/ugorji/go/codec v1.3.0 // indirect
golang.org/x/arch v0.3.0 // indirect golang.org/x/arch v0.21.0 // indirect
golang.org/x/crypto v0.9.0 // indirect golang.org/x/crypto v0.42.0 // indirect
golang.org/x/net v0.10.0 // indirect golang.org/x/net v0.44.0 // indirect; indirect·
golang.org/x/sys v0.8.0 // indirect golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.9.0 // indirect golang.org/x/text v0.29.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
zhub/cmd => ./zhub/cmd
zhub/internal/config => ./zhub/internal/config
zhub/internal/monitor => ./zhub/internal/monitor
zhub/internal/zsub => ./zhub/internal/zsub
) )

View File

@@ -2,12 +2,14 @@ package auth
import ( import (
"fmt" "fmt"
"gopkg.in/yaml.v3"
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"time" "time"
"gitea.1216.top/lxy/zhub/internal/config"
"gopkg.in/yaml.v3"
) )
type User struct { type User struct {
@@ -63,6 +65,12 @@ type PermissionManager struct {
func (p *PermissionManager) Init() error { func (p *PermissionManager) Init() error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
Conf := config.ReadConfig()
if !Conf.Service.Auth {
return nil
}
// Load YAML configuration from file // Load YAML configuration from file
data, err := os.ReadFile("./auth.yml") data, err := os.ReadFile("./auth.yml")
if err != nil { if err != nil {

View File

@@ -1,36 +1,40 @@
package config package config
import ( import (
"github.com/spf13/viper"
"log" "log"
"os" "os"
"github.com/spf13/viper"
"gopkg.in/ini.v1"
) )
type Log struct { type Log struct {
Handlers string Handlers string `ini:"handlers"`
Level string Level string `ini:"level"`
File string File string `ini:"file"`
} }
type Config struct { type Config struct {
Log Log Log Log `ini:"log"`
Service struct { Service struct {
Watch string Watch string `ini:"watch"`
Addr string Addr string `ini:"addr"`
Auth bool Auth bool `ini:"auth"`
} } `ini:"service"`
Data struct { Data struct {
Dir string Dir string `ini:"dir"`
} } `ini:"data"`
Ztimer struct { Ztimer struct {
Db struct { Db struct {
Addr string Addr string `ini:"addr"`
User string User string `ini:"user"`
Password string Password string `ini:"password"`
Database string Database string `ini:"database"`
} Schema string `ini:"schema"`
} Type string `ini:"type"`
Auth map[string]string } `ini:"db"`
} `ini:"ztimer"`
Auth map[string]string `ini:"auth"`
} }
func ReadConfig() Config { func ReadConfig() Config {
@@ -57,16 +61,16 @@ func ReadConfig() Config {
}*/ }*/
// 尝试从 /etc/ 目录下查找 zhub.ini 配置文件 // 尝试从 /etc/ 目录下查找 zhub.ini 配置文件
viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径 /*viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径
viper.SetConfigName("zhub") // 指定配置文件名为 zhub viper.SetConfigName("zhub") // 指定配置文件名为 zhub
if err := viper.ReadInConfig(); err == nil { if err := viper.ReadInConfig(); err == nil {
if err := viper.Unmarshal(&conf); err != nil { if err := viper.Unmarshal(&conf); err != nil {
log.Fatalf("Failed to unmarshal config: %s", err.Error()) log.Fatalf("Failed to unmarshal config: %s", err.Error())
} }
return conf return conf
} }*/
// 如果 /etc/ 目录下未找到配置文件,则尝试从当前程序运行目录下查找 app.ini 配置文件 // 如果 /etc/ 目录下未找到配置文件,则尝试从当前程序运行目录下查找 app.ini 配置文件
dir, err := os.Getwd() // 获取程序运行目录 /*dir, err := os.Getwd() // 获取程序运行目录
if err != nil { if err != nil {
log.Fatalf("Failed to get current directory: %s", err.Error()) log.Fatalf("Failed to get current directory: %s", err.Error())
} }
@@ -77,10 +81,24 @@ func ReadConfig() Config {
if err := viper.Unmarshal(&conf); err != nil { if err := viper.Unmarshal(&conf); err != nil {
log.Fatalf("Failed to unmarshal config: %s", err.Error()) log.Fatalf("Failed to unmarshal config: %s", err.Error())
} }
// conf.Ztimer.Db.Password 包含 % 做url 转码
if strings.Contains(conf.Ztimer.Db.Password, "%") {
unescape, err := url.QueryUnescape(conf.Ztimer.Db.Password)
if err == nil {
conf.Ztimer.Db.Password = unescape
}
}
return conf return conf
} else {
log.Fatalf("Config file not found: " + err.Error())
}*/
load, err := ini.Load("app.ini")
if err != nil {
log.Panicf("Failed to load config: %s", err.Error())
} }
// 如果在 /etc/ 目录和当前程序所在目录下均未找到配置文件,则报错 load.MapTo(&conf)
log.Fatalf("Config file not found")
return conf return conf
} }
func InitLog(logConfig Log) { func InitLog(logConfig Log) {

View File

@@ -1,11 +1,18 @@
package monitor package monitor
import ( import (
"github.com/gin-gonic/gin" "log"
"net/http" "net/http"
"zhub/internal/zsub"
"gitea.1216.top/lxy/zhub/internal/zbus"
"github.com/gin-gonic/gin"
) )
var r = gin.Default()
// Version 时间格式化 YYYY.MM.DD-HH.MM.SS
var Version string
func init() { func init() {
// 1.日志文件 定期分割归档 // 1.日志文件 定期分割归档
@@ -13,31 +20,41 @@ func init() {
func StartWatch() { func StartWatch() {
r := gin.Default()
r.Group("/users")
r.GET("/", func(c *gin.Context) { r.GET("/", func(c *gin.Context) {
c.File("./public/index.html") c.File("./public/index.html")
}) })
r.GET("/info", func(c *gin.Context) { r.GET("/_/info", func(c *gin.Context) {
c.JSON(http.StatusOK, zsub.Info()) c.JSON(http.StatusOK, zbus.Info())
}) })
r.GET("/cleanup", func(c *gin.Context) { r.GET("/_/cleanup", func(c *gin.Context) {
zsub.Hub.Clearup() zbus.Bus.Clearup()
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
r.GET("/_/version", func(c *gin.Context) {
c.JSON(http.StatusOK, map[string]string{
"version": Version,
})
})
r.GET("/timer/reload", func(c *gin.Context) { r.GET("/timer/reload", func(c *gin.Context) {
zsub.Hub.ReloadTimer() zbus.Bus.ReloadTimer()
c.JSON(http.StatusOK, "+reload timer ok") c.JSON(http.StatusOK, "+reload timer ok")
}) })
r.GET("/topic/publish", func(c *gin.Context) { r.POST("/message/send", func(c *gin.Context) {
topic := c.Query("topic") topic := c.PostForm("name") // c.Query("topic")
value := c.Query("value") value := c.PostForm("value") // c.Query("value")
// _type := c.PostForm("type") // publish、broadcast
zsub.Hub.Publish(topic, value) log.Println(c.PostForm("type"), topic, value)
switch c.PostForm("type") {
case "broadcast":
zbus.Bus.Broadcast(topic, value)
case "publish":
zbus.Bus.Publish(topic, value)
default:
}
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
r.GET("/topic/delay", func(c *gin.Context) { r.GET("/topic/delay", func(c *gin.Context) {
@@ -45,16 +62,16 @@ func StartWatch() {
value := c.Query("value") value := c.Query("value")
delay := c.Query("delay") delay := c.Query("delay")
zsub.Hub.Delay([]string{"delay", topic, value, delay}) zbus.Bus.Delay([]string{"delay", topic, value, delay})
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
// reload the auth configuration // reload the auth configuration
r.GET("/auth/reload", func(c *gin.Context) { r.GET("/auth/reload", func(c *gin.Context) {
zsub.AuthManager.Reload() zbus.AuthManager.Reload()
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
watchAddr := zsub.Conf.Service.Watch watchAddr := zbus.Conf.Service.Watch
r.Run(watchAddr) r.Run(watchAddr)
} }

View File

@@ -1,4 +1,4 @@
package zsub package zbus
import ( import (
"encoding/json" "encoding/json"
@@ -6,7 +6,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"zhub/internal/auth"
"gitea.1216.top/lxy/zhub/internal/auth"
) )
var AuthManager *auth.PermissionManager var AuthManager *auth.PermissionManager
@@ -22,10 +23,10 @@ func init() {
var funChan = make(chan func(), 1000) var funChan = make(chan func(), 1000)
func handleMessage(v Message) { func messageHandler(v Message) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("handleMessage Recovered:", r) log.Println("messageHandler Recovered:", r)
} }
}() }()
c := v.Conn c := v.Conn
@@ -47,7 +48,7 @@ func handleMessage(v Message) {
// 准入拦截,所有指令完成 auth 认证后才可进入 // 准入拦截,所有指令完成 auth 认证后才可进入
if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" { if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" {
c.send("-Auth: NOAUTH Authentication required:" + rcmd[0]) c.send("-Auth: Authentication required [" + rcmd[0] + "]")
return return
} }
// 指令预处理 // 指令预处理
@@ -79,8 +80,16 @@ func handleMessage(v Message) {
// auth check // auth check
switch cmd { switch cmd {
case "publish", "broadcast", "delay", "rpc": case "publish", "broadcast", "delay", "rpc":
if !AuthManager.AuthCheck(c.user, rcmd[1], "w") { if Conf.Service.Auth && !AuthManager.AuthCheck(c.user, rcmd[1], "w") {
c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.") c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.")
log.Printf("[%d] -Auth: %s [%s]\n", c.sn, cmd, rcmd[1])
if cmd == "rpc" {
rpcBody := make(map[string]string)
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
ruk := rpcBody["ruk"]
Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 401, 'retinfo': 'unauthorized', 'ruk': '"+ruk+"'}")
}
return return
} }
case "subscribe": // 在订阅逻辑处检查 case "subscribe": // 在订阅逻辑处检查
@@ -90,7 +99,7 @@ func handleMessage(v Message) {
switch cmd { switch cmd {
case "auth": case "auth":
userid, err := AuthManager.GetUserIdByToken(rcmd[1]) userid, err := AuthManager.GetUserIdByToken(rcmd[1])
if err != nil { if err != nil && Conf.Service.Auth {
c.send("-Error: " + err.Error()) c.send("-Error: " + err.Error())
return return
} }
@@ -118,13 +127,13 @@ func handleMessage(v Message) {
return return
case "rpc": case "rpc":
// if rpc and no sub back error // if rpc and no sub back error
if Hub.noSubscribe(rcmd[1]) { if Bus.noSubscribe(rcmd[1]) {
rpcBody := make(map[string]string) rpcBody := make(map[string]string)
json.Unmarshal([]byte(rcmd[2]), &rpcBody) json.Unmarshal([]byte(rcmd[2]), &rpcBody)
log.Println("rpc no subscribe: ", rcmd[1]) log.Printf("[%d] : rpc %s no subscribe", c.sn, rcmd[1])
ruk := rpcBody["ruk"] ruk := rpcBody["ruk"]
Hub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}")
return return
} }
@@ -134,7 +143,7 @@ func handleMessage(v Message) {
/*if len(topicChan) < cap(topicChan) { /*if len(topicChan) < cap(topicChan) {
topicChan <- rcmd topicChan <- rcmd
}*/ }*/
Hub.Publish(rcmd[1], rcmd[2]) Bus.Publish(rcmd[1], rcmd[2])
} }
return return
case "publish": case "publish":
@@ -144,13 +153,13 @@ func handleMessage(v Message) {
/*if len(topicChan) < cap(topicChan) { /*if len(topicChan) < cap(topicChan) {
topicChan <- rcmd topicChan <- rcmd
}*/ }*/
Hub.Publish(rcmd[1], rcmd[2]) Bus.Publish(rcmd[1], rcmd[2])
} }
return return
case "broadcast": case "broadcast":
Hub.broadcast(rcmd[1], rcmd[2]) Bus.Broadcast(rcmd[1], rcmd[2])
case "delay": case "delay":
Hub.Delay(rcmd) Bus.Delay(rcmd)
default: default:
} }
@@ -166,8 +175,9 @@ func handleMessage(v Message) {
// subscribe x y z // subscribe x y z
for _, topic := range rcmd[1:] { for _, topic := range rcmd[1:] {
// auth check // auth check
if !AuthManager.AuthCheck(c.user, rcmd[1], "r") { if Conf.Service.Auth && !AuthManager.AuthCheck(c.user, rcmd[1], "r") {
c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.") c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.")
log.Printf("-Auth: %s [%s]\n", cmd, rcmd[1])
continue continue
} }
c.subscribe(topic) c.subscribe(topic)
@@ -178,7 +188,7 @@ func handleMessage(v Message) {
} }
case "timer": case "timer":
for _, name := range rcmd[1:] { for _, name := range rcmd[1:] {
Hub.timer([]string{"timer", name}, c) // append to timers Bus.timer([]string{"timer", name}, c) // append to timers
c.timers = append(c.timers, name) // append to conns c.timers = append(c.timers, name) // append to conns
} }
case "cmd": case "cmd":
@@ -187,28 +197,28 @@ func handleMessage(v Message) {
} }
switch rcmd[1] { switch rcmd[1] {
case "reload-timer": case "reload-timer":
Hub.ReloadTimer() Bus.ReloadTimer()
case "shutdown": case "shutdown":
if AuthManager.IsAdmin(c.user) { if AuthManager.IsAdmin(c.user) {
return return
} }
Hub.shutdown() Bus.shutdown()
} }
case "lock": case "lock", "trylock":
// lock key uuid 5 // lock key uuid 5
if len(rcmd) != 4 { if len(rcmd) != 4 {
c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]") c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
return return
} }
d, _ := strconv.Atoi(rcmd[3]) d, _ := strconv.Atoi(rcmd[3])
Hub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) Bus._lock(&Lock{cmd: cmd, key: rcmd[1], uuid: rcmd[2], duration: d})
case "unlock": case "unlock":
// unlock key uuid // unlock key uuid
if len(rcmd) != 3 { if len(rcmd) != 3 {
c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]") c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
return return
} }
Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) Bus._unlock(Lock{cmd: cmd, key: rcmd[1], uuid: rcmd[2]})
default: default:
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
return return

View File

@@ -1,4 +1,4 @@
package zsub package zbus
import ( import (
"bufio" "bufio"
@@ -12,19 +12,21 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"unicode/utf8" "unicode/utf8"
"zhub/internal/config"
"gitea.1216.top/lxy/zhub/internal/config"
) )
var ( var (
Conf config.Config Conf config.Config
Hub = &ZSub{ Bus = &ZBus{
topics: make(map[string]*ZTopic), topics: make(map[string]*ZTopic),
timers: make(map[string]*ZTimer), timers: make(map[string]*ZTimer),
delays: make(map[string]*ZDelay), delays: make(map[string]*ZDelay),
locks: make(map[string][]*Lock), locks: make(map[string][]*Lock),
conns: make([]*ZConn, 0), conns: make([]*ZConn, 0),
sn: 1000,
} }
SN int32 = 1000 //SN int32 = 1000
) )
func init() { func init() {
@@ -41,7 +43,7 @@ func init() {
} }
}() }()
conns := make([]*ZConn, 0) // 需要关闭的连接 conns := make([]*ZConn, 0) // 需要关闭的连接
for _, c := range Hub.conns { for _, c := range Bus.conns {
if c.ping > 0 && c.ping-c.pong > 19 { if c.ping > 0 && c.ping-c.pong > 19 {
conns = c.appendTo(conns) conns = c.appendTo(conns)
continue continue
@@ -63,19 +65,20 @@ func init() {
} }
Hub.SaveData() Bus.SaveData()
} }
}() }()
} }
type ZSub struct { type ZBus struct {
sync.RWMutex sync.RWMutex
topics map[string]*ZTopic topics map[string]*ZTopic // 订阅主题
timers map[string]*ZTimer timers map[string]*ZTimer // 定时事件
delays map[string]*ZDelay delays map[string]*ZDelay // 延时消息
locks map[string][]*Lock locks map[string][]*Lock // 当前锁对象
conns []*ZConn conns []*ZConn // 所有的客户端连接
delayup bool sn int32 // 客户端连接编号
delayup bool // 是否需要延时持久保存数据
} }
type ZConn struct { //ZConn type ZConn struct { //ZConn
@@ -93,9 +96,10 @@ type ZConn struct { //ZConn
} }
type Lock struct { type Lock struct {
key string cmd string // lock|trylock|unlock
uuid string key string // lock key
duration int uuid string // apply for unique identification
duration int // lock duration (seconds)
timer *time.Timer timer *time.Timer
start int64 start int64
//stop time.Time //stop time.Time
@@ -103,7 +107,7 @@ type Lock struct {
func NewZConn(conn *net.Conn) *ZConn { func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{ return &ZConn{
sn: atomic.AddInt32(&SN, 1), // 连接编号 sn: atomic.AddInt32(&Bus.sn, 1),
conn: conn, conn: conn,
topics: []string{}, topics: []string{},
timers: []string{}, timers: []string{},
@@ -119,9 +123,9 @@ func NewZConn(conn *net.Conn) *ZConn {
3若有待消费消息启动消费 3若有待消费消息启动消费
*/ */
func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
Hub.Lock() Bus.Lock()
defer Hub.Unlock() defer Bus.Unlock()
ztopic := Hub.topics[topic] //ZTopic ztopic := Bus.topics[topic] //ZTopic
if ztopic == nil { if ztopic == nil {
ztopic = &ZTopic{ ztopic = &ZTopic{
groups: map[string]*ZGroup{}, groups: map[string]*ZGroup{},
@@ -129,7 +133,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
chMsg: make(chan string, 500), chMsg: make(chan string, 500),
} }
ztopic.init() ztopic.init()
Hub.topics[topic] = ztopic Bus.topics[topic] = ztopic
} }
zgroup := ztopic.groups[c.groupid] //ZGroup zgroup := ztopic.groups[c.groupid] //ZGroup
@@ -159,7 +163,7 @@ func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{}
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
close(c.substoped[topic]) close(c.substoped[topic])
ztopic := Hub.topics[topic] //ZTopic ztopic := Bus.topics[topic] //ZTopic
if ztopic == nil { if ztopic == nil {
return return
} }
@@ -205,10 +209,10 @@ func (c *ZConn) close() {
} }
// timer conn close // timer conn close
Hub.Lock() Bus.Lock()
defer Hub.Unlock() defer Bus.Unlock()
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
timer := Hub.timers[topic] timer := Bus.timers[topic]
if timer == nil { if timer == nil {
continue continue
} }
@@ -267,8 +271,8 @@ func StartServer(addr string, conf config.Config) {
}() }()
// 重新加载[定时、延时] // 重新加载[定时、延时]
go Hub.ReloadTimer() go Bus.ReloadTimer()
go Hub.LoadData() go Bus.LoadData()
// 启动服务监听 // 启动服务监听
listen, err := net.Listen("tcp", addr) listen, err := net.Listen("tcp", addr)
@@ -286,12 +290,12 @@ func StartServer(addr string, conf config.Config) {
zConn := NewZConn(&conn) zConn := NewZConn(&conn)
log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn) log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn)
go Hub.handlerConn(zConn) go Bus.handlerConn(zConn)
} }
} }
// 连接处理 // 连接处理
func (s *ZSub) handlerConn(c *ZConn) { func (s *ZBus) handlerConn(c *ZConn) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("handlerConn Recovered:", r) log.Println("handlerConn Recovered:", r)
@@ -301,7 +305,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
defer func() { defer func() {
// conn remove to conns // conn remove to conns
funChan <- func() { funChan <- func() {
Hub.conns = c.removeTo(Hub.conns) Bus.conns = c.removeTo(Bus.conns)
} }
// close ZConn // close ZConn
@@ -310,7 +314,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
// conn add to conns // conn add to conns
funChan <- func() { funChan <- func() {
Hub.conns = c.appendTo(Hub.conns) Bus.conns = c.appendTo(Bus.conns)
} }
reader := bufio.NewReader(*c.conn) reader := bufio.NewReader(*c.conn)
@@ -356,7 +360,7 @@ func (s *ZSub) handlerConn(c *ZConn) {
continue continue
} }
handleMessage(Message{Conn: c, Rcmd: rcmd}) messageHandler(Message{Conn: c, Rcmd: rcmd})
} }
} }
@@ -365,7 +369,7 @@ Publish topic message
1send message to topic's chan 1send message to topic's chan
2feedback send success to sender, and sending message to topic's subscripts 2feedback send success to sender, and sending message to topic's subscripts
*/ */
func (s *ZSub) Publish(topic, msg string) { func (s *ZBus) Publish(topic, msg string) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic ztopic := s.topics[topic] //ZTopic
@@ -384,9 +388,9 @@ func (s *ZSub) Publish(topic, msg string) {
} }
/* /*
send broadcast message send Broadcast message
*/ */
func (s *ZSub) broadcast(topic, msg string) { func (s *ZBus) Broadcast(topic, msg string) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
if strings.EqualFold(topic, "lock") { if strings.EqualFold(topic, "lock") {
@@ -406,10 +410,11 @@ func (s *ZSub) broadcast(topic, msg string) {
} }
/* /*
lock: lock key uuid t lock: lock key uuid t
unlock: unlock key uuid tryLock: trylock key uuid t
unlock: unlock key uuid
*/ */
func (s *ZSub) _lock(lock *Lock) { func (s *ZBus) _lock(lock *Lock) {
locks := s.locks[lock.key] locks := s.locks[lock.key]
if locks == nil { if locks == nil {
locks = make([]*Lock, 0) locks = make([]*Lock, 0)
@@ -418,7 +423,7 @@ func (s *ZSub) _lock(lock *Lock) {
lock.start = time.Now().Unix() lock.start = time.Now().Unix()
locks = append(locks, lock) locks = append(locks, lock)
s.locks[lock.key] = locks s.locks[lock.key] = locks
s.broadcast("lock", lock.uuid) s.Broadcast("lock", lock.uuid)
// 设置时间到解锁 // 设置时间到解锁
locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second) locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
@@ -429,10 +434,15 @@ func (s *ZSub) _lock(lock *Lock) {
} }
}() }()
} else { } else {
s.locks[lock.key] = append(locks, lock) switch lock.cmd {
case "trylock": // send trylock fail message
s.Broadcast("trylock", lock.uuid)
case "lock":
s.locks[lock.key] = append(locks, lock)
}
} }
} }
func (s *ZSub) _unlock(l Lock) { func (s *ZBus) _unlock(l Lock) {
locks := s.locks[l.key] locks := s.locks[l.key]
if locks == nil || len(locks) == 0 { if locks == nil || len(locks) == 0 {
return return
@@ -443,7 +453,7 @@ func (s *ZSub) _unlock(l Lock) {
s.locks[l.key] = locks s.locks[l.key] = locks
} }
if len(s.locks[l.key]) > 0 { // next lock if len(s.locks[l.key]) > 0 { // next lock
s.broadcast("lock", s.locks[l.key][0].uuid) s.Broadcast("lock", s.locks[l.key][0].uuid)
s.locks[l.key][0].start = time.Now().Unix() s.locks[l.key][0].start = time.Now().Unix()
s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second) s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
go func() { go func() {
@@ -455,7 +465,7 @@ func (s *ZSub) _unlock(l Lock) {
} }
} }
func (s *ZSub) shutdown() { func (s *ZBus) shutdown() {
s.SaveData() s.SaveData()
os.Exit(0) os.Exit(0)
} }
@@ -463,7 +473,7 @@ func (s *ZSub) shutdown() {
func Info() map[string]interface{} { func Info() map[string]interface{} {
// topics // topics
topics := map[string]interface{}{} topics := map[string]interface{}{}
for s, topic := range Hub.topics { for s, topic := range Bus.topics {
// {groups:[{name:xxx,size:xx}]} // {groups:[{name:xxx,size:xx}]}
arr := make([]map[string]interface{}, 0) arr := make([]map[string]interface{}, 0)
@@ -480,7 +490,7 @@ func Info() map[string]interface{} {
// conns // conns
conns := make([]interface{}, 0) conns := make([]interface{}, 0)
for _, c := range Hub.conns { for _, c := range Bus.conns {
m := make(map[string]interface{}, 0) m := make(map[string]interface{}, 0)
m["remoteaddr"] = (*c.conn).RemoteAddr() m["remoteaddr"] = (*c.conn).RemoteAddr()
m["groupid"] = c.groupid m["groupid"] = c.groupid
@@ -493,14 +503,14 @@ func Info() map[string]interface{} {
info := map[string]interface{}{ info := map[string]interface{}{
"topics": topics, "topics": topics,
"topicsize": len(topics), "topicsize": len(topics),
"timersize": len(Hub.timers), "timersize": len(Bus.timers),
"conns": conns, "conns": conns,
"connsize": len(Hub.conns), "connsize": len(Bus.conns),
} }
return info return info
} }
func (s *ZSub) Clearup() { func (s *ZBus) Clearup() {
for tn, topic := range s.topics { for tn, topic := range s.topics {
for _, group := range topic.groups { for _, group := range topic.groups {
if len(group.conns) > 0 || topic.mcount > group.offset { if len(group.conns) > 0 || topic.mcount > group.offset {
@@ -513,7 +523,7 @@ func (s *ZSub) Clearup() {
} }
} }
func (s *ZSub) noSubscribe(topic string) bool { func (s *ZBus) noSubscribe(topic string) bool {
zTopic := s.topics[topic] zTopic := s.topics[topic]
if zTopic == nil || len(zTopic.groups) == 0 { if zTopic == nil || len(zTopic.groups) == 0 {
return true return true

View File

@@ -1,4 +1,4 @@
package zsub package zbus
import ( import (
"bufio" "bufio"
@@ -34,7 +34,7 @@ func Append(str string, fileName string) {
} }
} }
func (s *ZSub) SaveData() { func (s *ZBus) SaveData() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("SaveData Recovered:", r) log.Println("SaveData Recovered:", r)
@@ -89,12 +89,12 @@ func (s *ZSub) SaveData() {
}() }()
} }
func (s *ZSub) LoadData() { func (s *ZBus) LoadData() {
s.loadDelay() s.loadDelay()
// s.loadLock() // s.loadLock()
} }
func (s *ZSub) loadDelay() { func (s *ZBus) loadDelay() {
f, err := os.Open(datadir + "/delay.z") f, err := os.Open(datadir + "/delay.z")
if err != nil { if err != nil {
return return
@@ -129,7 +129,7 @@ func (s *ZSub) loadDelay() {
} }
} }
func (s *ZSub) loadLock() { func (s *ZBus) loadLock() {
f, err := os.Open(datadir + "/lock.z") f, err := os.Open(datadir + "/lock.z")
if err != nil { if err != nil {
return return
@@ -161,6 +161,7 @@ func (s *ZSub) loadLock() {
} }
s._lock(&Lock{ s._lock(&Lock{
cmd: "lock",
key: split[0], key: split[0],
uuid: split[1], uuid: split[1],
duration: duration, duration: duration,

View File

@@ -1,4 +1,4 @@
package zsub package zbus
import ( import (
"log" "log"

View File

@@ -1,9 +1,10 @@
package zsub package zbus
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq" // 导入 pq 驱动
"github.com/robfig/cron" "github.com/robfig/cron"
"log" "log"
"regexp" "regexp"
@@ -29,7 +30,7 @@ type ZDelay struct {
} }
// Delay : delay topic value 100 -> publish topic value // Delay : delay topic value 100 -> publish topic value
func (s *ZSub) Delay(rcmd []string) { func (s *ZBus) Delay(rcmd []string) {
s.Lock() s.Lock()
defer func() { defer func() {
s.Unlock() s.Unlock()
@@ -70,7 +71,7 @@ func (s *ZSub) Delay(rcmd []string) {
select { select {
case <-delay.Timer.C: case <-delay.Timer.C:
log.Println("delay send:", rcmd[1], rcmd[2]) log.Println("delay send:", rcmd[1], rcmd[2])
Hub.Publish(rcmd[1], rcmd[2]) Bus.Publish(rcmd[1], rcmd[2])
funChan <- func() { funChan <- func() {
delete(s.delays, rcmd[1]+"-"+rcmd[2]) delete(s.delays, rcmd[1]+"-"+rcmd[2])
} }
@@ -82,7 +83,7 @@ func (s *ZSub) Delay(rcmd []string) {
/* /*
["Timer", Topic, expr, 0|1] ["Timer", Topic, expr, 0|1]
*/ */
func (s *ZSub) timer(rcmd []string, c *ZConn) { func (s *ZBus) timer(rcmd []string, c *ZConn) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
timer := s.timers[rcmd[1]] timer := s.timers[rcmd[1]]
@@ -108,8 +109,8 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
var timerFun = func() { var timerFun = func() {
for _, conn := range timer.Conns { for _, conn := range timer.Conns {
log.Println("Timer send:", timer.Topic) log.Println("timer send:", timer.Topic)
err := conn.send("Timer", timer.Topic) err := conn.send("timer", timer.Topic)
if timer.Single && err == nil { if timer.Single && err == nil {
break break
} }
@@ -155,20 +156,45 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
} }
} }
func (s *ZSub) ReloadTimer() { func (s *ZBus) ReloadTimer() {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", // 未配置 ztimer 数据库返回
Conf.Ztimer.Db.User, if Conf.Ztimer.Db.Addr == "" {
Conf.Ztimer.Db.Password, log.Println("No found ztimer config in app.ini")
Conf.Ztimer.Db.Addr, return
Conf.Ztimer.Db.Database, }
))
var db *sql.DB
var err error
if Conf.Ztimer.Db.Type == "postgres" {
hostPort := strings.Split(Conf.Ztimer.Db.Addr, ":")
db, err = sql.Open("postgres", fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=disable",
Conf.Ztimer.Db.User,
Conf.Ztimer.Db.Password,
hostPort[0],
hostPort[1],
Conf.Ztimer.Db.Database,
))
// 设置当前会话的 schema
_, err = db.Exec("SET search_path TO " + Conf.Ztimer.Db.Schema)
if err != nil {
log.Println(err)
}
} else {
db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
Conf.Ztimer.Db.User,
Conf.Ztimer.Db.Password,
Conf.Ztimer.Db.Addr,
Conf.Ztimer.Db.Database,
))
}
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
defer db.Close() defer db.Close()
rows, err := db.Query("SELECT t.`name`, IF(t.`status`=10,t.`expr`,''), IF(t.`single`=1,'a','x') 'single' FROM tasktimer t ORDER BY t.`timerid`") rows, err := db.Query("SELECT t.`name`, IF(t.`status`=10,t.`expr`,''), IF(t.`single`=1,'a','x') single FROM tasktimer t ORDER BY t.`timerid`")
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@@ -179,6 +205,6 @@ func (s *ZSub) ReloadTimer() {
var expr string var expr string
var single string var single string
rows.Scan(&name, &expr, &single) rows.Scan(&name, &expr, &single)
s.timer([]string{"Timer", name, expr, single}, nil) //["Timer", Topic, expr, a|x] s.timer([]string{"timer", name, expr, single}, nil) //["timer", Topic, expr, a|x]
} }
} }

View File

@@ -1,4 +1,4 @@
package zsub package zbus
import ( import (
"fmt" "fmt"

51
main.go
View File

@@ -2,23 +2,39 @@ package main
import ( import (
"flag" "flag"
"fmt"
"log" "log"
"zhub/cmd" "os"
"zhub/internal/config"
"zhub/internal/monitor" "gitea.1216.top/lxy/zhub/cmd"
"zhub/internal/zsub" "gitea.1216.top/lxy/zhub/internal/config"
"gitea.1216.top/lxy/zhub/internal/monitor"
"gitea.1216.top/lxy/zhub/internal/zbus"
) )
func main() { func main() {
var isCliMode bool // 是否以客户端模式运行的标志 // 命令查询版本号
var rcmd string // 客户端模式下运行的命令 versionFlag := flag.Bool("version", false, "Display the version")
flag.BoolVar(&isCliMode, "cli", false, "run as client mode") // 定义 cli 参数 vFlag := flag.Bool("v", false, "Display the version")
flag.StringVar(&rcmd, "r", "", "run as client mode") // 定义 r 参数 VFlag := flag.Bool("V", false, "Display the version")
flag.Parse() // 解析命令行参数
isCliMode := flag.Bool("cli", false, "Run as client mode") // 客户端模式参数
rcmd := flag.String("r", "", "Run command in client mode") // 客户端命令参数
// 解析命令行参数
flag.Parse()
// 检查是否有版本参数, 如果有则输出版本号并退出
if *versionFlag || *vFlag || *VFlag {
fmt.Printf("Version: %s\n", monitor.Version)
os.Exit(0) // 输出后退出
}
conf := config.ReadConfig() // 读取配置文件 conf := config.ReadConfig() // 读取配置文件
addr := conf.Service.Addr // 获取服务地址 addr := conf.Service.Addr // 获取服务地址
config.InitLog(conf.Log) // 初始化日志配置 config.InitLog(conf.Log) // 初始化日志配置
// 输出版本号
log.Println("ZHub version:", monitor.Version)
{ {
/* /*
@@ -29,20 +45,23 @@ func main() {
}*/ }*/
} }
if rcmd != "" { // 如果指定了客户端命令 if *rcmd != "" { // 如果指定了客户端命令
adminToken, err := zsub.AuthManager.AdminToken() // 认证信息 adminToken, err := zbus.AuthManager.AdminToken() // 认证信息
if err != nil { if err != nil {
log.Fatal(err) // Configuration error, stop the client from running. log.Fatal(err) // Configuration error, stop the client from running.
return return
} }
cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接 cli := cmd.ZHubClient{}
err = cli.Initx("server-local", addr, "server-admin", adminToken)
// cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接
if err != nil { if err != nil {
log.Println(err) // 如果连接失败则打印错误信息 log.Println(err) // 如果连接失败则打印错误信息
return return
} }
defer cli.Close() // 延迟关闭客户端连接 defer cli.Close() // 延迟关闭客户端连接
switch rcmd { switch *rcmd {
case "timer": case "timer":
cli.Cmd("reload-timer") cli.Cmd("reload-timer")
case "shutdown", "stop": case "shutdown", "stop":
@@ -50,10 +69,10 @@ func main() {
} }
return return
} }
if isCliMode { if *isCliMode {
cmd.ClientRun(addr) // 客户端运行 //cmd.ClientRun(addr) // 客户端运行
} else { } else {
go monitor.StartWatch() // 启动监控协程 go monitor.StartWatch() // 启动监控协程
zsub.StartServer(addr, conf) // 启动服务进程 zbus.StartServer(addr, conf) // 启动服务进程
} }
} }

View File

@@ -1,6 +0,0 @@
SET GOOS=linux
SET GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w"
upx -9 zhub.sh
rem scp zhub.sh dev:/opt/zhub