commit dcfb735a2d03e00c910a7aefb7cc37543acd05fa Author: 绝尘 <237809796@qq.com> Date: Wed Sep 24 00:54:26 2025 +0800 初始化 zhub Go 客户端项目 新增 zhub 客户端完整实现,包括配置加载、TCP通信、消息发布/订阅、 RPC 调用、分布式锁等功能。支持从项目配置文件、环境变量等多种方式初始化客户端,并提供便捷的全局调用接口。- 添加 `.gitignore` 忽略 IDE 和临时文件 - 实现 `api.go` 提供全局便捷方法封装客户端调用 - 实现 `client.go` 核心客户端逻辑,包含网络通信、消息处理等 - 添加 `client_test.go` 单元测试和集成测试示例 - 实现 `config.go` 支持灵活的配置加载机制 - 添加示例配置文件 `example-config.yml` - 初始化 Go 模块依赖 `go.mod` 和 `go.sum` - 实现 `init.go` 提供多种初始化方式 - 添加 MIT 许可证文件 `LICENSE` - 新增使用示例 `example/main.go` 展示基本功能调用 - 实现 `manager.go` 管理默认客户端实例及初始化逻辑 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3d95363 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +*.exe +*.iml + +/tmp diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a41eefb --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 zhub-go-client + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c1e8bb8 --- /dev/null +++ b/README.md @@ -0,0 +1,221 @@ +# ZHub Go Client + +一个独立的 ZHub 客户端组件,支持消息发布订阅、RPC 调用、分布式锁等功能。 + +## 功能特性 + +- ✅ 消息发布订阅 (Pub/Sub) +- ✅ RPC 调用支持 +- ✅ 分布式锁机制 +- ✅ 定时器支持 +- ✅ 自动重连 +- ✅ 灵活的配置管理 +- ✅ 环境变量支持 + +## 安装 + +### 从私有仓库安装 + +```bash +# 设置私有模块 +go env -w GOPRIVATE=gitea.1216.top/lxy/* + +# 安装包 +go get gitea.1216.top/lxy/zhub-go-client +``` + +### 配置 Gitea 私有仓库访问 + +```bash +# 方法1: 使用 SSH 密钥(推荐) +git config --global url."git@gitea.1216.top:".insteadOf "https://gitea.1216.top/" + +# 方法2: 使用用户名密码或 Token +git config --global url."https://用户名:密码@gitea.1216.top/".insteadOf "https://gitea.1216.top/" + +# 方法3: 使用 .netrc 文件 +# 创建 ~/.netrc 文件并添加认证信息 +``` + +## 快速开始 + +### 1. 配置文件初始化 + +```go +package main + +import ( + "log" + zhub "gitea.1216.top/lxy/zhub-go-client" +) + +func main() { + // 使用项目配置文件初始化 + err := zhub.InitWithProjectConfig("app.yml") + if err != nil { + log.Fatal("Failed to initialize zhub:", err) + } + + // 发布消息 + zhub.Publish("test-topic", "Hello World!") + + // 订阅消息 + zhub.Subscribe("test-topic", func(message string) { + log.Printf("Received: %s", message) + }) +} +``` + +### 2. 配置文件格式 + +```yaml +# app.yml +zhub: + appname: "my-app" + addr: "127.0.0.1:1216" + groupid: "my-group" + auth: "my-token" + +# 其他项目配置... +web: + addr: "0.0.0.0:8080" +``` + +### 3. 多种初始化方式 + +```go +// 方式1: 指定配置文件路径 +zhub.InitWithProjectConfig("config/app.yml") + +// 方式2: 自动发现配置文件 +zhub.InitFromCurrentDir() + +// 方式3: 使用环境变量 +zhub.InitFromEnv() // 需要设置 ZHUB_CONFIG_PATH + +// 方式4: 自定义配置选项 +opts := &zhub.ConfigOptions{ + ConfigPath: "./config/my-app.yml", + ConfigKey: "zhub", + EnvPrefix: "ZHUB", +} +zhub.InitWithOptions(opts) +``` + +## API 使用 + +### 消息发布订阅 + +```go +// 发布消息 +zhub.Publish("topic", "message") + +// 广播消息 +zhub.Broadcast("topic", "message") + +// 延迟消息 +zhub.Delay("topic", "message", 30) // 30秒后发送 + +// 订阅消息 +zhub.Subscribe("topic", func(message string) { + // 处理消息 +}) + +// 取消订阅 +zhub.Unsubscribe("topic") +``` + +### RPC 调用 + +```go +// RPC 客户端 +zhub.CallRpc("rpc-service", map[string]interface{}{ + "action": "getUser", + "id": 123, +}, func(result zhub.RpcResult) { + if result.Retcode == 0 { + log.Printf("Success: %v", result.Result) + } else { + log.Printf("Error: %s", result.Retinfo) + } +}) + +// RPC 服务端 +zhub.RpcSubscribe("rpc-service", func(rpc zhub.Rpc) zhub.RpcResult { + // 处理 RPC 请求 + return rpc.Render(map[string]interface{}{ + "user": map[string]interface{}{ + "id": 123, + "name": "John", + }, + }) +}) +``` + +### 分布式锁 + +```go +// 获取锁 +lock := zhub.AcquireLock("resource-key", 30) // 30秒超时 + +// 执行业务逻辑 +log.Println("Doing critical work...") +time.Sleep(time.Second * 2) + +// 释放锁 +zhub.ReleaseLock(lock) +``` + +### 定时器 + +```go +// 设置定时器 +zhub.Timer("my-timer", func() { + log.Println("Timer triggered!") +}) +``` + +## 环境变量支持 + +```bash +# 设置配置文件路径 +export ZHUB_CONFIG_PATH="/path/to/config.yml" + +# 或者直接设置配置值 +export ZHUB_APPNAME="my-app" +export ZHUB_ADDR="127.0.0.1:1216" +export ZHUB_GROUPID="my-group" +export ZHUB_AUTH="my-token" +``` + +## 错误处理 + +```go +// 检查初始化错误 +err := zhub.InitWithProjectConfig("config.yml") +if err != nil { + log.Printf("初始化失败: %v", err) + return +} + +// 检查操作错误 +err = zhub.Publish("topic", "message") +if err != nil { + log.Printf("发布失败: %v", err) +} +``` + +## 测试 + +```bash +# 运行测试 +go test ./... + +# 运行示例 +cd example +go run main.go +``` + +## 许可证 + +MIT License diff --git a/api.go b/api.go new file mode 100644 index 0000000..283fbf4 --- /dev/null +++ b/api.go @@ -0,0 +1,58 @@ +package zhub + +import "time" + +// 全局便捷方法 +func Publish(topic, message string) error { + return GetClient().Publish(topic, message) +} + +func Broadcast(topic, message string) error { + return GetClient().Broadcast(topic, message) +} + +func Delay(topic, message string, delay int) error { + return GetClient().Delay(topic, message, delay) +} + +func Subscribe(topic string, callback func(string)) { + GetClient().Subscribe(topic, callback) +} + +func Unsubscribe(topic string) { + GetClient().Unsubscribe(topic) +} + +func Timer(topic string, callback func()) { + GetClient().Timer(topic, callback) +} + +func CallRpc(topic string, message interface{}, callback func(RpcResult)) { + GetClient().Rpc(topic, message, callback) +} + +func RpcWithTimeout(topic string, message interface{}, timeout time.Duration, callback func(RpcResult)) { + GetClient().RpcWithTimeout(topic, message, timeout, callback) +} + +func RpcSubscribe(topic string, handler func(Rpc) RpcResult) { + GetClient().RpcSubscribe(topic, handler) +} + +func AcquireLock(key string, duration int) Lock { + return GetClient().Lock(key, duration) +} + +func ReleaseLock(lock Lock) { + GetClient().Unlock(lock) +} + +func Cmd(cmd ...string) { + GetClient().Cmd(cmd...) +} + +func Close() { + if defaultClient != nil { + defaultClient.Close() + } +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..5c2dd3b --- /dev/null +++ b/client.go @@ -0,0 +1,508 @@ +package zhub + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + "unicode/utf8" + + "github.com/go-basic/uuid" +) + +// ZHubClient zhub客户端 +type ZHubClient struct { + wlock sync.Mutex // write lock + rlock sync.Mutex // read lock + + Appname string // local appname + Addr string // host:port + Groupid string // client group id + Auth string + conn net.Conn // socket conn + createTime time.Time // client create time + + subFun map[string]func(v string) // subscribe topic and callback function + timerFun map[string]func() // subscribe timer amd callback function + + chSend chan []string // chan of send message + chReceive chan []string // chan of receive message + timerReceive chan []string // chan of timer + lockFlag map[string]*Lock // chan of lock +} + +// Start 创建一个客户端 +func (c *ZHubClient) Start() error { + conn, err := net.Dial("tcp", c.Addr) + if err != nil { + return err + } + + c.conn = conn + c.wlock = sync.Mutex{} + c.rlock = sync.Mutex{} + c.createTime = time.Now() + + c.subFun = make(map[string]func(v string)) + c.timerFun = make(map[string]func()) + c.chSend = make(chan []string, 100) + c.chReceive = make(chan []string, 100) + c.timerReceive = make(chan []string, 100) + c.lockFlag = make(map[string]*Lock) + + c.send("auth", c.Auth) + c.send("groupid " + c.Groupid) + c.init() + return err +} + +func (c *ZHubClient) reconn() (err error) { + for { + conn, err := net.Dial("tcp", c.Addr) + if err != nil { + log.Println("reconn", err) + time.Sleep(time.Second * 3) + continue + } else if err == nil { + c.conn = conn + c.send("auth", c.Auth) + c.send("groupid " + c.Groupid) + go c.receive() + + // 重新订阅 + for topic := range c.subFun { + c.Subscribe(topic, nil) + } + for topic := range c.timerFun { + c.Timer(topic, nil) + } + break + } + } + return +} + +func (c *ZHubClient) init() { + // 消费 topic 消息 + go func() { + for { + select { + case vs := <-c.chReceive: + fun := c.subFun[vs[1]] + if fun == nil { + log.Println("topic received, nothing to do", vs[1], vs[2]) + continue + } + fun(vs[2]) + case vs := <-c.timerReceive: + fun := c.timerFun[vs[1]] + if fun == nil { + log.Println("timer received, nothing to do", vs[1]) + continue + } + fun() + } + } + }() + c.rpcInit() + + go c.receive() +} + +// Subscribe subscribe topic +func (c *ZHubClient) Subscribe(topic string, fun func(v string)) { + c.send("subscribe " + topic) + if fun != nil { + c.wlock.Lock() + defer c.wlock.Unlock() + c.subFun[topic] = fun + } +} + +func (c *ZHubClient) Unsubscribe(topic string) { + c.send("unsubscribe " + topic) + delete(c.subFun, topic) +} + +// Publish 发布消息 +func (c *ZHubClient) Publish(topic string, message string) error { + return c.send("publish", topic, message) +} + +func (c *ZHubClient) Broadcast(topic string, message string) error { + return c.send("broadcast", topic, message) +} + +func (c *ZHubClient) Delay(topic string, message string, delay int) error { + return c.send("delay", topic, message, strconv.Itoa(delay)) +} + +func (c *ZHubClient) Timer(topic string, fun func()) { + if fun != nil { + c.timerFun[topic] = fun + } + c.send("timer", topic) +} + +// Cmd send cmd +func (c *ZHubClient) Cmd(cmd ...string) { + if len(cmd) == 1 { + c.send("cmd", cmd[0]) + } else if len(cmd) > 1 { + cmdx := make([]string, 0) + cmdx = append(cmdx, "cmd") + cmdx = append(cmdx, cmd...) + c.send(cmdx...) + } +} + +func (c *ZHubClient) Close() { + c.conn.Close() +} + +// Lock Key +func (c *ZHubClient) Lock(key string, duration int) Lock { + uuid := uuid.New() + c.send("uuid", key, uuid, strconv.Itoa(duration)) + + lockChan := make(chan int, 2) + go func() { + c.wlock.Lock() + defer c.wlock.Unlock() + c.lockFlag[uuid] = &Lock{ + Key: key, + Value: uuid, + flagChan: lockChan, + } + }() + + select { + case <-lockChan: + log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid) + } + + return Lock{Key: key, Value: uuid} +} + +func (c *ZHubClient) Unlock(l Lock) { + c.send("unlock", l.Key, l.Value) + delete(c.lockFlag, l.Value) +} + +// -------------------------------------- rpc -------------------------------------- +var rpcMap = make(map[string]*Rpc) +var rpcLock = sync.RWMutex{} + +func (c *ZHubClient) rpcInit() { + // 添加 appname 主题订阅处理 + c.Subscribe(c.Appname, func(v string) { + log.Println("rpc back:", v) + rpcLock.Lock() + defer rpcLock.Unlock() + + result := RpcResult{} + err := json.Unmarshal([]byte(v), &result) + if err != nil { + // 返回失败处理 + log.Println("rpc result parse error:", err) + return + } + + rpc := rpcMap[result.Ruk] + if rpc == nil { + return // 本地已无 rpc 请求等待,如:超时结束 + } + + rpc.RpcResult = result + close(rpc.Ch) // 发送事件 + delete(rpcMap, result.Ruk) + }) +} + +func (c *ZHubClient) Rpc(topic string, message interface{}, back func(res RpcResult)) { + c.RpcWithTimeout(topic, message, time.Second*15, back) +} + +func (c *ZHubClient) RpcWithTimeout(topic string, message interface{}, timeout time.Duration, back func(res RpcResult)) { + _message, err := ToJSON(message) + if err != nil { + log.Println("rpc message error:", err) + back(RpcResult{ + Retcode: 505, + Retinfo: err.Error(), + }) + return + } + + rpc := Rpc{ + Ruk: c.Appname + "::" + uuid.New(), + Topic: topic, + Value: _message, + Ch: make(chan int, 0), + } + content, _ := ToJSON(&rpc) + log.Println("rpc call:", content) + c.Publish(topic, content) + + rpcLock.Lock() + rpcMap[rpc.Ruk] = &rpc + rpcLock.Unlock() + + select { + case <-rpc.Ch: + // ch 事件(rpc 返回) + case <-time.After(timeout): + // rpc 超时 + x, _ := json.Marshal(rpc) + log.Println("rpc timeout:", x) + rpc.RpcResult = RpcResult{ + Retcode: 505, + Retinfo: "请求超时", + } + } + back(rpc.RpcResult) +} + +// RpcSubscribe rpc subscribe +func (c *ZHubClient) RpcSubscribe(topic string, fun func(Rpc) RpcResult) { + c.Subscribe(topic, func(v string) { + rpc := Rpc{} + err := json.Unmarshal([]byte(v), &rpc) + if err != nil { + return + } + + result := fun(rpc) + result.Ruk = rpc.Ruk + + // 判断数据类型,如果不是 string, 将数据转为sting + switch result.Result.(type) { + case string: + case int: + case int64: + case float64: + case bool: + default: + marshal, err := json.Marshal(result.Result) + if err != nil { + log.Println("rpc result marshal error:", err) + return + } + result.Result = string(marshal) + } + log.Println("Result: ", result.Result) + + res, _ := json.Marshal(result) + c.Publish(rpc.backTopic(), string(res)) + }) +} + +/* +send socket message : +if len(vs) equal 1 will send message `vs[0] + "\r\n"` +else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...` +*/ +func (c *ZHubClient) send(vs ...string) (err error) { + c.wlock.Lock() + defer c.wlock.Unlock() +a: + if len(vs) == 1 { + _, err = c.conn.Write([]byte(vs[0] + "\r\n")) + } else if len(vs) > 1 { + data := "*" + strconv.Itoa(len(vs)) + "\r\n" + for _, v := range vs { + data += "$" + strconv.Itoa(utf8.RuneCountInString(v)) + "\r\n" + data += v + "\r\n" + } + _, err = c.conn.Write([]byte(data)) + } + if err != nil { + log.Println(err) + time.Sleep(time.Second * 3) + // check conn reconnect + { + c.wlock.Unlock() + c.reconn() + c.wlock.Lock() + } + goto a + } + + return err +} + +func (c *ZHubClient) receive() { + r := bufio.NewReader(c.conn) + for { + v, _, err := r.ReadLine() + if err != nil { + log.Println(err) + c.reconn() + return + } + if len(v) == 0 { + continue + } + switch v[0] { + case '+': + if string(v) == "+ping" { + c.send("+pong") + continue + } + log.Println("receive:", string(v)) + case '-': + log.Println("error:", string(v)) + case '*': + if len(v) < 2 { + continue + } + n, err := strconv.Atoi(string(v[1:])) // 指令长度 + if err != nil || n <= 0 { + continue + } + var vs []string + //for i := 0; i < n; i++ { + for len(vs) < n { + line, _, _ := r.ReadLine() // $7 len() + clen, _ := strconv.Atoi(string(line[1:])) // 指令长度 + line, prefix, _ := r.ReadLine() // message|lock|timer + _v := string(line) + for prefix && len(_v) < clen { + line, prefix, _ = r.ReadLine() + _v += string(line) + } + + vs = append(vs, _v) + } + if len(vs) == 3 && vs[0] == "message" { + if vs[1] == "lock" { + go func() { + log.Println("lock:" + vs[2]) + c.wlock.Lock() + defer c.wlock.Unlock() + if c.lockFlag[vs[2]] == nil { + return + } + c.lockFlag[vs[2]].flagChan <- 0 + }() + } else { + log.Printf("zhub accept topic: [%s %s]\n", vs[1], vs[2]) + c.chReceive <- vs + } + continue + } + if len(vs) == 2 && vs[0] == "timer" { + log.Printf("zhub accept timer: [%s]\n", vs[1]) + c.timerReceive <- vs + continue + } + } + } +} + +// ClientRun client 命令行程序 +func ClientRun(addr string) { + conn, err := net.Dial("tcp", fmt.Sprintf("%s", addr)) + + for { + if err != nil { + log.Println(err) + time.Sleep(time.Second * 3) + conn, err = net.Dial("tcp", fmt.Sprintf("%s", addr)) + continue + } + + fmt.Println(fmt.Sprintf("had connected server: %s", addr)) + break + } + + defer func() { + conn.Close() + }() + + go clientRead(conn) + + for { + inReader := bufio.NewReader(os.Stdin) + line, err := inReader.ReadString('\n') + if err != nil { + fmt.Println(err) + return + } + + line = strings.Trim(line, "\r\n") + line = strings.Trim(line, "\n") + line = strings.Trim(line, " ") + + if strings.EqualFold(line, "") { + continue + } else if strings.EqualFold(line, ":exit") { + fmt.Println("exit!") + return + } + + line = strings.ReplaceAll(line, " ", "") + parr := strings.Split(line, " ") + conn.Write([]byte("*" + strconv.Itoa(len(parr)) + "\r\n")) + for i := range parr { + conn.Write([]byte("$" + strconv.Itoa(len(parr[i])) + "\r\n")) + conn.Write([]byte(parr[i] + "\r\n")) + } + } +} + +func clientRead(conn net.Conn) { + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered:", r) + } + }() + + reader := bufio.NewReader(conn) + for { + rcmd := make([]string, 0) + line, _, err := reader.ReadLine() + if err != nil { + log.Println("connection error: ", err) + return + } else if len(line) == 0 { + continue + } + + switch string(line[:1]) { + case "*": + n, _ := strconv.Atoi(string(line[1:])) + for i := 0; i < n; i++ { + reader.ReadLine() + v, _, _ := reader.ReadLine() + rcmd = append(rcmd, string(v)) + } + case "+": + rcmd = append(rcmd, string(line)) + case "-": + rcmd = append(rcmd, string(line)) + case ":": + rcmd = append(rcmd, string(line)) + case "h": + if strings.EqualFold(string(line), "help-start") { + for { + v, _, _ := reader.ReadLine() + if strings.EqualFold(string(v), "help-end") { + break + } + rcmd = append(rcmd, string(v)+"\r\n") + } + } + default: + rcmd = append(rcmd, string(line)) + } + + fmt.Println(">", strings.Join(rcmd, " ")) + } +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..e640f49 --- /dev/null +++ b/client_test.go @@ -0,0 +1,93 @@ +package zhub + +import ( + "testing" + "time" +) + +func TestConfigLoading(t *testing.T) { + // 测试配置加载 + opts := &ConfigOptions{ + ConfigPath: "example-config.yml", + ConfigKey: "zhub", + } + + config, err := LoadConfigWithOptions(opts) + if err != nil { + t.Skipf("Config file not found, skipping test: %v", err) + } + + if config.Appname == "" { + t.Error("Appname should not be empty") + } + + if config.Addr == "" { + t.Error("Addr should not be empty") + } +} + +func TestJSONConversion(t *testing.T) { + // 测试 JSON 转换 + testCases := []struct { + input interface{} + expected string + }{ + {"hello", "hello"}, + {123, "123"}, + {true, "true"}, + {map[string]interface{}{"key": "value"}, `{"key":"value"}`}, + } + + for _, tc := range testCases { + result, err := ToJSON(tc.input) + if err != nil { + t.Errorf("ToJSON failed for %v: %v", tc.input, err) + continue + } + + if result != tc.expected { + t.Errorf("ToJSON(%v) = %s, expected %s", tc.input, result, tc.expected) + } + } +} + +func TestClientCreation(t *testing.T) { + // 注意:这个测试需要实际的 zhub 服务器运行 + t.Skip("Skipping integration test - requires zhub server") + + config := &Config{ + Appname: "test-app", + Addr: "127.0.0.1:1216", + Groupid: "test-group", + Auth: "test-token", + } + + client, err := NewClient(config) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + defer client.Close() + + // 测试发布消息 + err = client.Publish("test-topic", "test message") + if err != nil { + t.Errorf("Failed to publish message: %v", err) + } + + // 测试订阅 + messageReceived := make(chan string, 1) + client.Subscribe("test-topic", func(message string) { + messageReceived <- message + }) + + // 等待消息 + select { + case msg := <-messageReceived: + if msg != "test message" { + t.Errorf("Expected 'test message', got '%s'", msg) + } + case <-time.After(time.Second * 5): + t.Error("Timeout waiting for message") + } +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..4b1957b --- /dev/null +++ b/config.go @@ -0,0 +1,97 @@ +package zhub + +import ( + "fmt" + "os" + + "github.com/spf13/viper" +) + +// Config zhub客户端配置 +type Config struct { + Appname string `mapstructure:"appname"` + Addr string `mapstructure:"addr"` + Groupid string `mapstructure:"groupid"` + Auth string `mapstructure:"auth"` +} + +// ConfigOptions 配置加载选项 +type ConfigOptions struct { + ConfigPath string // 配置文件路径 + ConfigName string // 配置文件名 (默认: "app") + ConfigType string // 配置文件类型 (默认: "yml") + ConfigKey string // 配置节点名 (默认: "zhub") + EnvPrefix string // 环境变量前缀 (可选) +} + +// LoadConfigWithOptions 使用选项加载配置 +func LoadConfigWithOptions(opts *ConfigOptions) (*Config, error) { + viper.Reset() + + // 设置默认值 + if opts.ConfigName == "" { + opts.ConfigName = "app" + } + if opts.ConfigType == "" { + opts.ConfigType = "yml" + } + if opts.ConfigKey == "" { + opts.ConfigKey = "zhub" + } + + // 如果指定了配置文件路径,使用指定路径 + if opts.ConfigPath != "" { + viper.SetConfigFile(opts.ConfigPath) + } else { + // 否则使用默认搜索路径 + viper.SetConfigName(opts.ConfigName) + viper.SetConfigType(opts.ConfigType) + + // 添加多个搜索路径 + viper.AddConfigPath(".") + viper.AddConfigPath("./conf") + viper.AddConfigPath("./config") + viper.AddConfigPath("/etc") + } + + // 设置环境变量前缀 + if opts.EnvPrefix != "" { + viper.SetEnvPrefix(opts.EnvPrefix) + viper.AutomaticEnv() + } + + // 读取配置文件 + if err := viper.ReadInConfig(); err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + var config Config + if err := viper.UnmarshalKey(opts.ConfigKey, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal config: %w", err) + } + + return &config, nil +} + +// LoadConfig 默认配置加载(向后兼容) +func LoadConfig(configPath string) (*Config, error) { + opts := &ConfigOptions{ + ConfigPath: configPath, + ConfigKey: "zhub", + } + return LoadConfigWithOptions(opts) +} + +// LoadConfigFromProject 从项目配置文件加载(推荐方式) +func LoadConfigFromProject(projectConfigPath string) (*Config, error) { + // 检查文件是否存在 + if _, err := os.Stat(projectConfigPath); os.IsNotExist(err) { + return nil, fmt.Errorf("config file not found: %s", projectConfigPath) + } + + opts := &ConfigOptions{ + ConfigPath: projectConfigPath, + ConfigKey: "zhub", + } + return LoadConfigWithOptions(opts) +} diff --git a/example-config.yml b/example-config.yml new file mode 100644 index 0000000..3a7f847 --- /dev/null +++ b/example-config.yml @@ -0,0 +1,13 @@ +# zhub 配置示例 +zhub: + appname: "my-app" + addr: "127.0.0.1:1216" + groupid: "my-group" + auth: "my-token" + +# 其他项目配置... +web: + addr: "0.0.0.0:8080" + +mysql: + dsn: "root:password@tcp(localhost:3306)/mydb" diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..086cb7a --- /dev/null +++ b/example/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "time" + + "gitea.1216.top/lxy/zhub-go-client" +) + +func main() { + // 方式1: 使用项目配置文件初始化 + err := zhub.InitWithProjectConfig("../../app.yml") + if err != nil { + log.Fatal("Failed to initialize zhub:", err) + } + + // 方式2: 自动发现配置文件 + // err := zhub.InitFromCurrentDir() + // if err != nil { + // log.Fatal("Failed to initialize zhub:", err) + // } + + // 方式3: 使用环境变量 + // err := zhub.InitFromEnv() + // if err != nil { + // log.Fatal("Failed to initialize zhub:", err) + // } + + log.Println("ZHub client initialized successfully") + + // 发布消息 + err = zhub.Publish("test-topic", "Hello World!") + if err != nil { + log.Printf("Failed to publish: %v", err) + } + + // 订阅消息 + zhub.Subscribe("test-topic", func(message string) { + log.Printf("Received message: %s", message) + }) + + // RPC 调用 + zhub.CallRpc("rpc-test", map[string]interface{}{ + "action": "hello", + "data": "test data", + }, func(result zhub.RpcResult) { + log.Printf("RPC Result: %+v", result) + }) + + // RPC 服务端 + zhub.RpcSubscribe("rpc-test", func(rpc zhub.Rpc) zhub.RpcResult { + log.Printf("RPC Request: %+v", rpc) + return rpc.Render(map[string]interface{}{ + "status": "success", + "data": "response data", + }) + }) + + // 分布式锁 + lock := zhub.AcquireLock("test-lock", 30) + log.Printf("Lock acquired: %+v", lock) + + // 模拟业务处理 + time.Sleep(time.Second * 2) + + zhub.ReleaseLock(lock) + log.Println("Lock released") + + // 保持程序运行 + time.Sleep(time.Hour) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6ac6cc1 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module gitea.1216.top/lxy/zhub-go-client + +go 1.24.0 + +require ( + github.com/go-basic/uuid v1.0.0 + github.com/spf13/viper v1.21.0 +) + +require ( + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.28.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d51919f --- /dev/null +++ b/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-basic/uuid v1.0.0 h1:Faqtetcr8uwOzR2qp8RSpkahQiv4+BnJhrpuXPOo63M= +github.com/go-basic/uuid v1.0.0/go.mod h1:yVtVnsXcmaLc9F4Zw7hTV7R0+vtuQw00mdXi+F6tqco= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= +github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/init.go b/init.go new file mode 100644 index 0000000..5fdec77 --- /dev/null +++ b/init.go @@ -0,0 +1,53 @@ +package zhub + +import ( + "fmt" + "os" + "path/filepath" +) + +// InitFromCurrentDir 从当前目录的配置文件初始化 +func InitFromCurrentDir() error { + // 尝试多个可能的配置文件 + possibleConfigs := []string{ + "app.yml", + "config.yml", + "config.yaml", + "app.yaml", + "./conf/app.yml", + "./config/app.yml", + } + + for _, configPath := range possibleConfigs { + if _, err := os.Stat(configPath); err == nil { + return InitWithProjectConfig(configPath) + } + } + + return fmt.Errorf("no config file found in current directory") +} + +// InitFromEnv 从环境变量指定的配置文件初始化 +func InitFromEnv() error { + configPath := os.Getenv("ZHUB_CONFIG_PATH") + if configPath == "" { + return fmt.Errorf("ZHUB_CONFIG_PATH environment variable not set") + } + + return InitWithProjectConfig(configPath) +} + +// InitWithWorkingDir 使用工作目录下的配置文件 +func InitWithWorkingDir(configName string) error { + if configName == "" { + configName = "app.yml" + } + + workDir, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get working directory: %w", err) + } + + configPath := filepath.Join(workDir, configName) + return InitWithProjectConfig(configPath) +} diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..2d09103 --- /dev/null +++ b/manager.go @@ -0,0 +1,92 @@ +package zhub + +import ( + "fmt" + "log" + "sync" +) + +// Client 客户端包装器 +type Client struct { + *ZHubClient + config *Config +} + +var ( + defaultClient *Client + once sync.Once +) + +// InitWithOptions 使用选项初始化默认客户端 +func InitWithOptions(opts *ConfigOptions) error { + var err error + once.Do(func() { + config, configErr := LoadConfigWithOptions(opts) + if configErr != nil { + err = fmt.Errorf("failed to load config: %w", configErr) + return + } + + client, clientErr := NewClient(config) + if clientErr != nil { + err = fmt.Errorf("failed to create client: %w", clientErr) + return + } + + defaultClient = client + }) + return err +} + +// InitWithProjectConfig 使用项目配置文件初始化 +func InitWithProjectConfig(projectConfigPath string) error { + opts := &ConfigOptions{ + ConfigPath: projectConfigPath, + ConfigKey: "zhub", + } + return InitWithOptions(opts) +} + +// NewClient 创建新的客户端实例 +func NewClient(config *Config) (*Client, error) { + client := &Client{ + ZHubClient: &ZHubClient{ + Appname: config.Appname, + Addr: config.Addr, + Groupid: config.Groupid, + Auth: config.Auth, + }, + config: config, + } + + err := client.Start() + if err != nil { + return nil, err + } + + return client, nil +} + +// DefaultClient 获取默认客户端实例 +func DefaultClient() *Client { + if defaultClient == nil { + log.Panic("zhub client not initialized, call InitWithProjectConfig() first") + } + return defaultClient +} + +// GetClient 获取客户端实例(如果未初始化则使用默认配置) +func GetClient() *Client { + if defaultClient == nil { + // 尝试从当前目录加载默认配置 + opts := &ConfigOptions{ + ConfigName: "app", + ConfigType: "yml", + ConfigKey: "zhub", + } + if err := InitWithOptions(opts); err != nil { + log.Panic("Failed to initialize zhub client:", err) + } + } + return defaultClient +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..db452c8 --- /dev/null +++ b/types.go @@ -0,0 +1,76 @@ +package zhub + +import "strings" + +// Lock 分布式锁 +type Lock struct { + Key string // lock Key + Value string // lock Value + flagChan chan int // +} + +// Rpc RPC 请求结构 +type Rpc struct { + Ruk string `json:"ruk"` + Topic string `json:"topic"` + Value string `json:"value"` + + Ch chan int `json:"-"` //请求返回标记 + RpcResult RpcResult `json:"-"` +} + +// RpcRet RPC 返回值接口 +type RpcRet interface { + GetRuk() string + GetRetcode() int + GetRetinfo() string + GetResult() any +} + +// RpcResult RPC 返回结果 +type RpcResult struct { + Ruk string `json:"ruk"` + Retcode int `json:"retcode"` + Retinfo string `json:"retinfo"` + Result any `json:"result"` +} + +func (r *RpcResult) GetRuk() string { + return r.Ruk +} + +func (r *RpcResult) GetRetcode() int { + return r.Retcode +} + +func (r *RpcResult) GetRetinfo() string { + return r.Retinfo +} + +func (r *RpcResult) GetResult() any { + return r.Result +} + +func (r *RpcResult) Err(err error) RpcResult { + r.Retcode = 100 + r.Retinfo = err.Error() + return *r +} + +func (r Rpc) backTopic() string { + return strings.Split(r.Ruk, "::")[0] +} + +func (r Rpc) Err(err error) RpcResult { + return RpcResult{ + Retcode: 100, + Retinfo: err.Error(), + } +} + +func (r Rpc) Render(result any) RpcResult { + return RpcResult{ + Retcode: 0, + Result: result, + } +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..ac1392e --- /dev/null +++ b/utils.go @@ -0,0 +1,28 @@ +package zhub + +import ( + "encoding/json" + "strconv" +) + +// ToJSON 将任意类型转换为JSON字符串 +func ToJSON(message interface{}) (string, error) { + switch v := message.(type) { + case string: + return v, nil + case int: + return strconv.Itoa(v), nil + case int64: + return strconv.FormatInt(v, 10), nil + case float64: + return strconv.FormatFloat(v, 'f', -1, 64), nil + case bool: + return strconv.FormatBool(v), nil + default: + bytes, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(bytes), nil + } +}