From 44d4fcdbc4915b3b12dca197c4c3fcc967632b18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=9D=E5=B0=98?= <237809796@qq.com> Date: Mon, 6 Oct 2025 00:46:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=EF=BC=9A1=E3=80=81=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E5=8D=87=E7=BA=A7=20=20=20=20=20=202=E3=80=81module?= =?UTF-8?q?=20=E5=90=8D=E7=A7=B0=E5=9C=B0=E5=9D=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 33 +--- README.md | 264 +++++++++++++++++++------- build.bat | 99 +++++++++- cmd/client.go | 131 +------------ go.mod | 83 ++++---- internal/auth/auth.go | 5 +- internal/config/config.go | 60 +++--- internal/monitor/monitor.go | 23 ++- internal/zbus/zbus-message-handler.go | 5 +- internal/zbus/zbus.go | 13 +- main.go | 11 +- 11 files changed, 402 insertions(+), 325 deletions(-) diff --git a/Dockerfile b/Dockerfile index c6eec69..f7c3397 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,38 +1,11 @@ -# 阶段一:构建阶段 -FROM golang:alpine AS builder - -WORKDIR /opt/zhub - -# 复制 go.mod 文件 -COPY go.mod . - -# 下载并缓存依赖包 -RUN go mod download - -# 将源代码复制到容器中 -COPY . . - -# 运行 go get 命令以确保所有需要的包都被获取和更新 -#RUN go get zhub/internal/monitor -#RUN go get zhub/cmd -#RUN go get zhub/internal/zsub -#RUN go get zhub/internal/config - -# 构建可执行文件 -RUN go build -o zhub.sh -ldflags "-s -w" - - -# 阶段二:运行阶段 +# 构建运行阶段 FROM alpine:latest # 设置工作目录 WORKDIR /opt/zhub -# 从构建阶段复制可执行文件到当前阶段 -COPY --from=builder /opt/zhub/zhub.sh . -COPY --from=builder /opt/zhub/app.ini . - -# 复制 app.ini 配置文件到容器中 +# 从构建阶段复制可执行文件和配置到当前阶段 +COPY zhub.sh . COPY app.ini . EXPOSE 711 1216 diff --git a/README.md b/README.md index 4e898bd..b18ecf4 100644 --- a/README.md +++ b/README.md @@ -1,85 +1,203 @@ -# ZHub 快速上手 +# ZHub 工程项目文档 -[ZHub 快速上手:https://docs.1216.top](https://docs.1216.top) - -## 概述 -> zhub是⼀个⾼性能事件发布订阅服务组件,功能丰富,包含发布-订阅、⼴播消息、延时消息、 -Rpc调⽤、分布式定时调度、分布式锁,运⾏包仅有1M+;低服务资源消费,初始启动内存 10M-。 - -![zhub-fun.png](https://img.1216.top/docs/zhub/zhub-fun.png) +## 1. 概述 +ZHub 是一个基于 Go 的高性能分布式消息中间件,支持发布/订阅模式、定时任务、延时消息、分布式锁等功能。它适用于需要高并发、低延迟的消息传递场景。 --- -## 开始 搭建 zhub 服务 -> 让我们在 **5到10分钟内完成 zhub 中间件安装、集成、测试**. - -### 下载软件包 - -- [zhub.zip (点击下载)](https://img.1216.top/docs/zhub/zhub.zip) 包含以下内容: - - `zhub-client-0.1.1.dev.jar` 常规Java 项目驱动包 - - `zhub-client-spring-0.1.1.jar` springboot 项目驱动包 - - `zhub.exe` Window 运行包 - - `zhub.sh` Linux 运行包 - - `zhub` Mac 运行包 - - `app.ini` 配置文件 - ![zhub-zip.png](https://img.1216.top/docs/zhub/dist-zip.png) - -### 配置 app.ini - -```bash -# app.ini -[service] -watch=0.0.0.0:711 # 服务管理端口 -addr=0.0.0.0:1216 # 服务端口 -auth=0 # 是否开启连接授权 0不开启、1开启 - -[data] -dir=./data # 数据目录 - -[log] -handlers=console # console|file -level=debug # info|debug|error -file=zhub.log - -[ztimer] # ztimer 配置 (可选,如果不使用定时调度则可不配置) -# db.addr=127.0.0.1:3306 # timer 使用的MySql数据库配置 -# db.user=root -# db.password=123456 -# db.database=zhub -``` +## 2. 技术栈 +- **语言**: Go (Golang) +- **网络库**: `net`, `bufio` +- **并发模型**: 使用 goroutines 和 channels 实现并发处理。 +- **持久化**: 使用文件存储(如延时任务和锁信息)。 +- **配置管理**: 使用 Viper 读取 INI 格式的配置文件。 +- **权限控制**: 基于 YAML 的用户、组、Token 和频道权限管理。 +- **监控界面**: 使用 Gin 提供 HTTP 接口用于监控和服务管理。 +- **数据库支持**: 支持 MySQL 和 PostgreSQL 用于定时任务的配置。 --- -### 初始化 ztimer 数据库 (可选,如果不使用定时调度则不需要配置) +## 3. 核心功能模块 -```sql -CREATE DATABASE zhub; -CREATE TABLE `zhub`.`tasktimer` ( - `timerid` varchar(64) NOT NULL DEFAULT '' COMMENT '[主键]UUID', - `name` varchar(32) NOT NULL DEFAULT '' COMMENT '[任务名称]', - `expr` varchar(32) NOT NULL DEFAULT '' COMMENT '[时间表达式]', - `single` int NOT NULL DEFAULT '1' COMMENT '[单实例消费]1单对象,0不限', - `remark` varchar(128) NOT NULL DEFAULT '' COMMENT '[备注]', - `status` smallint NOT NULL DEFAULT '10' COMMENT '[状态]10启用,60停用', - PRIMARY KEY (`timerid`) USING BTREE -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -# 初始化 四个定时任务配置, 订阅名称分别为 T:A、T:B、T:C、T:D -INSERT INTO `zhub`.`tasktimer` (`timerid`, `name`, `expr`, `single`, `remark`, `status`) VALUES - ('T1', 'T:A', '*/5 * * * * ?', 1, '每5秒执行一次', 10), - ('T2', 'T:B', '15s', 1, '每15秒执行一次', 10), - ('T3', 'T:C', '0 0 0 * * 1', 0, '每周一00:00执行', 10), - ('T4', 'T:D', '0 0 24 * * ?', 1, '每天00:00执行', 10); -``` +### 3.1 消息总线 (`zbus`) +- **主题管理**: 支持多主题订阅与发布。 +- **组订阅**: 每个主题可以有多个消费组,组内客户端共享消息。 +- **延时消息**: 支持延时发送消息,使用 `time.Timer` 实现。 +- **定时任务**: 支持 cron 表达式或固定时间间隔的任务调度。 +- **分布式锁**: 提供互斥锁机制,支持重入和自动释放。 +- **持久化**: 使用文件保存延时任务和锁的状态,确保服务重启后数据不丢失。 -### 启动服务 +### 3.2 权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37)) +- **用户认证**: 支持基于 Token 的认证。 +- **角色授权**: 用户分属不同组,组内定义读写权限。 +- **频道访问控制**: 频道可设置为公开或私有,私有频道需授权访问。 +- **配置热加载**: 支持运行时重新加载权限配置。 + +### 3.3 客户端 (`client`) +- **连接管理**: 支持自动重连机制。 +- **订阅与发布**: 提供 API 订阅主题并接收消息。 +- **RPC 调用**: 支持远程过程调用,并处理超时和返回结果。 +- **锁机制**: 支持获取和释放分布式锁。 + +### 3.4 监控 (`monitor`) +- **HTTP 接口**: 提供 `/_/info`, `/_/cleanup`, `/timer/reload` 等接口。 +- **可视化界面**: 提供简单的 HTML 页面展示系统状态。 +- **版本信息**: 显示当前运行的 ZHub 版本。 -```bash -# window -./zhub.exe -# linux (添加执行权限 chmod +x ./zhub.sh) -./zhub.sh -``` --- -### 使用 zhub 收发消息 - [使用 zhub 收发消息 https://docs.1216.top](https://docs.1216.top) +## 4. 架构设计 + +### 4.1 总体架构 +ZHub 采用经典的 C/S 架构,包含以下主要组件: +- **服务端 ([main.go](file://D:\wk-go\zhub\main.go))** + - 启动 TCP 服务器监听客户端连接。 + - 加载配置文件并初始化日志。 + - 启动监控服务(Gin)。 +- **客户端 ([client.go](file://D:\wk-go\zhub\cmd\client.go))** + - 提供连接、订阅、发布、RPC 调用等 API。 + - 自动重连机制保证连接稳定性。 +- **消息总线 (`zbus`)** + - 处理消息的发布、订阅、延时、定时等核心逻辑。 +- **权限管理 ([auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37))** + - 管理用户、组、Token 和频道权限。 +- **监控 (`monitor`)** + - 提供 HTTP 接口和 Web 界面用于管理和监控。 + +### 4.2 数据流图 +```plaintext ++-------------------+ +------------------+ +------------------+ +| Client (Go) |<--->| ZHub Server |<--->| Monitor (Gin) | ++-------------------+ +------------------+ +------------------+ + | + v + +------------------+ + | Persistence | + | (File, DB) | + +------------------+ +``` + + +--- + +## 5. 配置说明 + +### 5.1 配置文件 ([app.ini](file://D:\wk-go\zhub\app.ini)) +- **[service]**: 服务相关配置。 + - `watch`: 监控服务地址。 + - [addr](file://D:\wk-go\zhub\cmd\client.go#L25-L25): 主服务监听地址。 + - [auth](file://D:\wk-go\zhub\cmd\client.go#L37-L37): 是否启用认证(0 不启用,1 启用)。 +- **[data]**: 数据目录配置。 +- **[log]**: 日志配置。 + - `handlers`: 输出方式(console/file)。 + - `level`: 日志级别(info/debug/error)。 + - [file](file://D:\wk-go\zhub\Dockerfile): 日志文件路径。 +- **[ztimer]**: 定时任务数据库配置。 + - `db.addr`, `db.user`, `db.password`, `db.database`, `db.schema`, `db.type`. + +### 5.2 权限配置 ([auth.yml](file://D:\wk-go\zhub\auth.yml)) +- **users**: 用户列表,包含 ID、用户名、密码、状态、所属组、读写权限。 +- **groups**: 用户组列表,包含名称、描述、读写权限。 +- **tokens**: Token 列表,包含 ID、用户 ID、Token 值、过期时间和状态。 +- **channels**: 频道列表,包含名称、描述、是否为公开频道。 + +--- + +## 6. 部署与构建 + +### 6.1 构建脚本 ([build.bat](file://D:\wk-go\zhub\build.bat)) +- 支持跨平台编译(Linux、Windows、Mac)。 +- 使用 UPX 压缩生成的二进制文件。 +- 自动生成版本号并嵌入到程序中。 + +### 6.2 启动命令 +- **服务端启动**: + ```bash + go run main.go + ``` + +- **客户端启动**: + ```bash + go run cmd/client.go -cli -r + ``` + + +### 6.3 监控服务 +- 默认监听地址:`http://:/` +- 支持的接口: + - `/_/info`: 获取系统信息。 + - `/_/cleanup`: 清理无订阅的主题。 + - `/timer/reload`: 重新加载定时任务。 + - `/topic/publish`: 发布主题消息。 + - `/topic/delay`: 发送延时消息。 + - `/auth/reload`: 重新加载权限配置。 + +--- + +## 7. 安全性设计 + +### 7.1 认证机制 +- 所有客户端必须通过 Token 认证才能进行操作。 +- Token 具有过期时间和状态管理。 + +### 7.2 授权机制 +- 用户分属不同组,组内定义读写权限。 +- 频道可设置为公开或私有,私有频道需授权访问。 +- 支持正则表达式匹配主题名,实现细粒度的权限控制。 + +### 7.3 分布式锁 +- 使用互斥锁机制防止多个客户端同时执行关键操作。 +- 锁具有超时机制,避免死锁。 + +--- + +## 8. 性能优化 + +### 8.1 并发处理 +- 使用 goroutines 和 channels 实现高效的并发处理。 +- 所有 I/O 操作异步化,减少阻塞。 + +### 8.2 内存管理 +- 使用缓冲区池化技术减少内存分配。 +- 对频繁使用的结构体进行复用。 + +### 8.3 日志优化 +- 支持日志级别控制,减少不必要的日志输出。 +- 日志输出格式化,便于调试和分析。 + +--- + +## 9. 测试与验证 + +### 9.1 单元测试 +- 覆盖所有核心模块的功能测试。 +- 使用 Go 的 testing 包编写测试用例。 + +### 9.2 集成测试 +- 模拟多个客户端并发操作,验证系统的稳定性和一致性。 +- 测试定时任务、延时消息、锁机制等复杂场景。 + +### 9.3 压力测试 +- 使用基准测试工具评估系统的吞吐量和响应时间。 +- 优化瓶颈部分,提升整体性能。 + +--- + +## 10. 未来规划 + +### 10.1 功能增强 +- 支持更多类型的数据库(如 MongoDB、Redis)。 +- 增加消息持久化支持(如 Kafka、RabbitMQ)。 +- 提供更丰富的监控指标和告警机制。 + +### 10.2 性能优化 +- 引入零拷贝技术减少内存复制。 +- 进一步优化 goroutine 调度策略。 + +### 10.3 安全加固 +- 增加 TLS 加密通信支持。 +- 支持更复杂的权限策略(如 RBAC、ABAC)。 + +### 10.4 用户体验 +- 开发图形化管理界面,简化配置和监控操作。 +- 提供 SDK 支持多种编程语言(如 Python、Java)。 \ No newline at end of file diff --git a/build.bat b/build.bat index 26cc216..6335114 100644 --- a/build.bat +++ b/build.bat @@ -1,5 +1,6 @@ - @echo off +chcp 65001 >nul +title zhub 构建脚本 rem 获取当前日期和时间并格式化为 YYYY.MM.DD-HH.MM.SS for /f "tokens=2 delims==" %%i in ('wmic os get localdatetime /value') do set datetime=%%i @@ -11,27 +12,107 @@ set minute=%datetime:~10,2% set second=%datetime:~12,2% set version=%year%.%month%.%day%-%hour%.%minute%.%second% -rem 删除历史编译文件 -del zhub.sh zhub.exe zhub +rem 输出当前版本号 +echo 当前构建版本号:%version% -rem Linux +rem 删除旧文件 +echo 删除历史构建文件... +del /q zhub zhub.sh zhub.exe >nul 2>nul + +rem 交互式平台选择 +echo ============================== +echo 请选择要构建的平台: +echo ============================== +echo 1. Linux (amd64) +echo 2. Linux (arm64) +echo 3. Windows (amd64) +echo 4. MacOS (amd64) +echo 5. 全部构建 +echo 0. 退出 +echo ============================== +set /p choice=请输入选项编号(可多选,例如 135): + +if "%choice%"=="0" goto end + +if not "%choice%"=="" ( + echo 开始构建版本 %version%... +) + +if "%choice%"=="1" ( + call :build_linux +) +if "%choice%"=="2" ( + call :build_linux_arm +) +if "%choice%"=="3" ( + call :build_windows +) +if "%choice%"=="4" ( + call :build_mac +) +if "%choice%"=="5" ( + call :build_linux + call :build_linux_arm + call :build_windows + call :build_mac +) + +goto end + +rem ========= 构建平台函数 ========== + +:build_linux +echo [Linux amd64] 开始构建... set GOOS=linux set GOARCH=amd64 go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'" upx -9 zhub.sh +call :move_to_dist zhub.sh linux +goto :eof -rem Windows +:build_linux_arm +echo [Linux arm64] 开始构建... +set GOOS=linux +set GOARCH=arm64 +go build -o zhub.sh -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'" +upx -9 zhub.sh +call :move_to_dist zhub.sh linux-arm +goto :eof + +:build_windows +echo [Windows amd64] 开始构建... set GOOS=windows set GOARCH=amd64 go build -o zhub.exe -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'" upx -9 zhub.exe +call :move_to_dist zhub.exe windows +goto :eof -rem Mac +:build_mac +echo [MacOS amd64] 开始构建... set GOOS=darwin set GOARCH=amd64 go build -o zhub -ldflags "-s -w -X 'zhub/internal/monitor.Version=%version%'" upx -9 zhub +call :move_to_dist zhub mac +goto :eof -move /Y zhub.sh ./tmp/zhub/ -move /Y zhub.exe ./tmp/zhub/ -move /Y zhub ./tmp/zhub/ +rem ========= 移动构建文件 ========== + +:move_to_dist +setlocal +set file=%1 +set platform=%2 +set target=dist\%version%\%platform% +if not exist %target% ( + mkdir %target% +) +move /Y %file% %target%\ >nul +echo 构建产物已移动至:%target%\%file% +endlocal +goto :eof + +:end +echo 构建流程结束。 +pause +exit diff --git a/cmd/client.go b/cmd/client.go index f2456f4..a728743 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -3,19 +3,17 @@ package cmd import ( "bufio" "encoding/json" - "fmt" - "github.com/go-basic/uuid" "io" "unicode/utf8" - //"github.com/go-basic/uuid" "log" "net" - "os" "strconv" "strings" "sync" "time" + + "github.com/go-basic/uuid" ) type ZHubClient struct { @@ -289,7 +287,11 @@ func (r Rpc) backTopic() string { return strings.Split(r.Ruk, "::")[0] } -func (c ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) { +func (c *ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) { + c.RpcWithTimeout(topic, message, time.Second*15, back) +} + +func (c *ZHubClient) RpcWithTimeout(topic string, message string, timeout time.Duration, back func(res RpcResult)) { rpc := Rpc{ Ruk: c.appname + "::" + uuid.New(), Topic: topic, @@ -310,7 +312,7 @@ func (c ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) select { case <-rpc.Ch: // ch 事件(rpc 返回) - case <-time.After(time.Second * 15): + case <-time.After(timeout): // rpc 超时 x, _ := json.Marshal(rpc) log.Println("rpc timeout:", x) @@ -323,7 +325,7 @@ func (c ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) } // RpcSubscribe rpc subscribe -func (c ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { +func (c *ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { c.Subscribe(topic, func(v string) { rpc := Rpc{} err := json.Unmarshal([]byte(v), &rpc) @@ -449,118 +451,3 @@ func (c *ZHubClient) receive() { } } } - -// -------------------------------------- hm -------------------------------------- - -// ============================================================================== - -var reconnect = 0 - -// ClientRun client 命令行程序 -func ClientRun(addr string) { - conn, err := net.Dial("tcp", fmt.Sprintf("%s", addr)) - - for { - if err != nil { - log.Println(err) - time.Sleep(time.Second * 3) - conn, err = net.Dial("tcp", fmt.Sprintf("%s", addr)) - continue - } - - fmt.Println(fmt.Sprintf("had connected server: %s", addr)) - break - } - - defer func() { - if reconnect == 1 { - conn.Close() - ClientRun(addr) - } - }() - - go clientRead(conn) - - for { - inReader := bufio.NewReader(os.Stdin) - line, err := inReader.ReadString('\n') - if err != nil { - fmt.Println(err) - return - } else if reconnect == 1 { - return - } - - line = strings.Trim(line, "\r\n") - line = strings.Trim(line, "\n") - line = strings.Trim(line, " ") - - if strings.EqualFold(line, "") { - continue - } else if strings.EqualFold(line, ":exit") { - fmt.Println("exit!") - return - } - - //fmt.Println("发送数据:" + line) - - line = strings.ReplaceAll(line, " ", "") - parr := strings.Split(line, " ") - conn.Write([]byte("*" + strconv.Itoa(len(parr)) + "\r\n")) - for i := range parr { - conn.Write([]byte("$" + strconv.Itoa(len(parr[i])) + "\r\n")) - conn.Write([]byte(parr[i] + "\r\n")) - } - } -} - -func clientRead(conn net.Conn) { - defer func() { - if r := recover(); r != nil { - fmt.Println("Recovered:", r) - } - reconnect = 1 - }() - - reader := bufio.NewReader(conn) - for { - rcmd := make([]string, 0) - line, _, err := reader.ReadLine() - if err != nil { - log.Println("connection error: ", err) - return - } else if len(line) == 0 { - continue - } - - switch string(line[:1]) { - case "*": - n, _ := strconv.Atoi(string(line[1:])) - for i := 0; i < n; i++ { - reader.ReadLine() - v, _, _ := reader.ReadLine() - rcmd = append(rcmd, string(v)) - } - case "+": - rcmd = append(rcmd, string(line)) - case "-": - rcmd = append(rcmd, string(line)) - case ":": - rcmd = append(rcmd, string(line)) - case "h": - if strings.EqualFold(string(line), "help-start") { - for { - v, _, _ := reader.ReadLine() - if strings.EqualFold(string(v), "help-end") { - break - } - rcmd = append(rcmd, string(v)+"\r\n") - } - } - default: - rcmd = append(rcmd, string(line)) - } - - fmt.Println(">", strings.Join(rcmd, " ")) - } -} diff --git a/go.mod b/go.mod index 0ce074a..913eb94 100644 --- a/go.mod +++ b/go.mod @@ -1,66 +1,63 @@ -module zhub +module gitea.1216.top/lxy/zhub -go 1.22.2 +go 1.25.0 require ( - github.com/gin-gonic/gin v1.9.1 + github.com/gin-gonic/gin v1.11.0 github.com/go-basic/uuid v1.0.0 - github.com/go-sql-driver/mysql v1.8.1 + github.com/go-sql-driver/mysql v1.9.3 + github.com/lib/pq v1.10.9 github.com/robfig/cron v1.2.0 - github.com/spf13/viper v1.18.2 + github.com/spf13/viper v1.21.0 + gopkg.in/ini.v1 v1.67.0 ) require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/bytedance/sonic/loader v0.1.1 // indirect - github.com/cloudwego/base64x v0.1.3 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect - github.com/sagikazarmark/locafero v0.4.0 // indirect - github.com/sagikazarmark/slog-shim v0.1.0 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/goccy/go-yaml v1.18.0 // indirect + github.com/quic-go/qpack v0.5.1 // indirect + github.com/quic-go/quic-go v0.55.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/sagikazarmark/locafero v0.12.0 // indirect + go.uber.org/mock v0.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/mod v0.28.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/tools v0.37.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) require ( - github.com/bytedance/sonic v1.11.5 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect + github.com/bytedance/sonic v1.14.1 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.10 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.19.0 // indirect - github.com/goccy/go-json v0.10.2 // indirect - github.com/google/go-cmp v0.6.0 // indirect - github.com/hashicorp/hcl v1.0.0 // indirect + github.com/go-playground/validator/v10 v10.27.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.2.1 // indirect - github.com/spf13/afero v1.11.0 // indirect - github.com/spf13/cast v1.6.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/ugorji/go/codec v1.2.12 // indirect - golang.org/x/arch v0.7.0 // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/net v0.24.0 // indirect; indirect· - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect - gopkg.in/ini.v1 v1.67.0 // indirect + github.com/ugorji/go/codec v1.3.0 // indirect + golang.org/x/arch v0.21.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/net v0.44.0 // indirect; indirect· + golang.org/x/sys v0.36.0 // indirect + golang.org/x/text v0.29.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect gopkg.in/yaml.v3 v3.0.1 ) - -replace ( - zhub/cmd => ./zhub/cmd - zhub/internal/config => ./zhub/internal/config - zhub/internal/monitor => ./zhub/internal/monitor - zhub/internal/zbus => ./zhub/internal/zbus -) diff --git a/internal/auth/auth.go b/internal/auth/auth.go index a8b5896..c6bda28 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -2,13 +2,14 @@ package auth import ( "fmt" - "gopkg.in/yaml.v3" "os" "regexp" "strings" "sync" "time" - "zhub/internal/config" + + "gitea.1216.top/lxy/zhub/internal/config" + "gopkg.in/yaml.v3" ) type User struct { diff --git a/internal/config/config.go b/internal/config/config.go index 1fd469b..8259470 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,40 +1,40 @@ package config import ( - "github.com/spf13/viper" "log" - "net/url" "os" - "strings" + + "github.com/spf13/viper" + "gopkg.in/ini.v1" ) type Log struct { - Handlers string - Level string - File string + Handlers string `ini:"handlers"` + Level string `ini:"level"` + File string `ini:"file"` } type Config struct { - Log Log + Log Log `ini:"log"` Service struct { - Watch string - Addr string - Auth bool - } + Watch string `ini:"watch"` + Addr string `ini:"addr"` + Auth bool `ini:"auth"` + } `ini:"service"` Data struct { - Dir string - } + Dir string `ini:"dir"` + } `ini:"data"` Ztimer struct { Db struct { - Addr string - User string - Password string - Database string - Schema string - Type string - } - } - Auth map[string]string + Addr string `ini:"addr"` + User string `ini:"user"` + Password string `ini:"password"` + Database string `ini:"database"` + Schema string `ini:"schema"` + Type string `ini:"type"` + } `ini:"db"` + } `ini:"ztimer"` + Auth map[string]string `ini:"auth"` } func ReadConfig() Config { @@ -61,16 +61,16 @@ func ReadConfig() Config { }*/ // 尝试从 /etc/ 目录下查找 zhub.ini 配置文件 - viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径 + /*viper.AddConfigPath("/etc/") // 添加 /etc/ 目录作为配置文件搜索路径 viper.SetConfigName("zhub") // 指定配置文件名为 zhub if err := viper.ReadInConfig(); err == nil { if err := viper.Unmarshal(&conf); err != nil { log.Fatalf("Failed to unmarshal config: %s", err.Error()) } return conf - } + }*/ // 如果 /etc/ 目录下未找到配置文件,则尝试从当前程序运行目录下查找 app.ini 配置文件 - dir, err := os.Getwd() // 获取程序运行目录 + /*dir, err := os.Getwd() // 获取程序运行目录 if err != nil { log.Fatalf("Failed to get current directory: %s", err.Error()) } @@ -90,9 +90,15 @@ func ReadConfig() Config { } return conf + } else { + log.Fatalf("Config file not found: " + err.Error()) + }*/ + + load, err := ini.Load("app.ini") + if err != nil { + log.Panicf("Failed to load config: %s", err.Error()) } - // 如果在 /etc/ 目录和当前程序所在目录下均未找到配置文件,则报错 - log.Fatalf("Config file not found") + load.MapTo(&conf) return conf } func InitLog(logConfig Log) { diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 172a270..6f110bf 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -1,9 +1,11 @@ package monitor import ( - "github.com/gin-gonic/gin" + "log" "net/http" - "zhub/internal/zbus" + + "gitea.1216.top/lxy/zhub/internal/zbus" + "github.com/gin-gonic/gin" ) var r = gin.Default() @@ -39,11 +41,20 @@ func StartWatch() { zbus.Bus.ReloadTimer() c.JSON(http.StatusOK, "+reload timer ok") }) - r.GET("/topic/publish", func(c *gin.Context) { - topic := c.Query("topic") - value := c.Query("value") + r.POST("/message/send", func(c *gin.Context) { + topic := c.PostForm("name") // c.Query("topic") + value := c.PostForm("value") // c.Query("value") + // _type := c.PostForm("type") // publish、broadcast - zbus.Bus.Publish(topic, value) + log.Println(c.PostForm("type"), topic, value) + switch c.PostForm("type") { + case "broadcast": + zbus.Bus.Broadcast(topic, value) + case "publish": + zbus.Bus.Publish(topic, value) + default: + + } c.JSON(http.StatusOK, "+OK") }) r.GET("/topic/delay", func(c *gin.Context) { diff --git a/internal/zbus/zbus-message-handler.go b/internal/zbus/zbus-message-handler.go index 18fb1c6..07df23c 100644 --- a/internal/zbus/zbus-message-handler.go +++ b/internal/zbus/zbus-message-handler.go @@ -6,7 +6,8 @@ import ( "strconv" "strings" "time" - "zhub/internal/auth" + + "gitea.1216.top/lxy/zhub/internal/auth" ) var AuthManager *auth.PermissionManager @@ -156,7 +157,7 @@ func messageHandler(v Message) { } return case "broadcast": - Bus.broadcast(rcmd[1], rcmd[2]) + Bus.Broadcast(rcmd[1], rcmd[2]) case "delay": Bus.Delay(rcmd) default: diff --git a/internal/zbus/zbus.go b/internal/zbus/zbus.go index 30064df..7a81ce3 100644 --- a/internal/zbus/zbus.go +++ b/internal/zbus/zbus.go @@ -12,7 +12,8 @@ import ( "sync/atomic" "time" "unicode/utf8" - "zhub/internal/config" + + "gitea.1216.top/lxy/zhub/internal/config" ) var ( @@ -387,9 +388,9 @@ func (s *ZBus) Publish(topic, msg string) { } /* -send broadcast message +send Broadcast message */ -func (s *ZBus) broadcast(topic, msg string) { +func (s *ZBus) Broadcast(topic, msg string) { s.RLock() defer s.RUnlock() if strings.EqualFold(topic, "lock") { @@ -422,7 +423,7 @@ func (s *ZBus) _lock(lock *Lock) { lock.start = time.Now().Unix() locks = append(locks, lock) s.locks[lock.key] = locks - s.broadcast("lock", lock.uuid) + s.Broadcast("lock", lock.uuid) // 设置时间到解锁 locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second) @@ -435,7 +436,7 @@ func (s *ZBus) _lock(lock *Lock) { } else { switch lock.cmd { case "trylock": // send trylock fail message - s.broadcast("trylock", lock.uuid) + s.Broadcast("trylock", lock.uuid) case "lock": s.locks[lock.key] = append(locks, lock) } @@ -452,7 +453,7 @@ func (s *ZBus) _unlock(l Lock) { s.locks[l.key] = locks } if len(s.locks[l.key]) > 0 { // next lock - s.broadcast("lock", s.locks[l.key][0].uuid) + s.Broadcast("lock", s.locks[l.key][0].uuid) s.locks[l.key][0].start = time.Now().Unix() s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second) go func() { diff --git a/main.go b/main.go index b38aaf2..9662951 100644 --- a/main.go +++ b/main.go @@ -5,10 +5,11 @@ import ( "fmt" "log" "os" - "zhub/cmd" - "zhub/internal/config" - "zhub/internal/monitor" - "zhub/internal/zbus" + + "gitea.1216.top/lxy/zhub/cmd" + "gitea.1216.top/lxy/zhub/internal/config" + "gitea.1216.top/lxy/zhub/internal/monitor" + "gitea.1216.top/lxy/zhub/internal/zbus" ) func main() { @@ -69,7 +70,7 @@ func main() { return } if *isCliMode { - cmd.ClientRun(addr) // 客户端运行 + //cmd.ClientRun(addr) // 客户端运行 } else { go monitor.StartWatch() // 启动监控协程 zbus.StartServer(addr, conf) // 启动服务进程