diff --git a/cmd/client.go b/cmd/client.go index 2382d72..f2456f4 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -18,7 +18,7 @@ import ( "time" ) -type Client struct { +type ZHubClient struct { wlock sync.Mutex // write lock rlock sync.Mutex // read lock @@ -46,37 +46,40 @@ type Lock struct { // duration int // lock duration } -func Create(appname, addr, groupid, auth string) (*Client, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return &Client{}, err - } - - client := Client{ - wlock: sync.Mutex{}, - rlock: sync.Mutex{}, - appname: appname, - addr: addr, - conn: conn, - groupid: groupid, - createTime: time.Now(), - - subFun: make(map[string]func(v string)), - timerFun: make(map[string]func()), - chSend: make(chan []string, 100), - chReceive: make(chan []string, 100), - timerReceive: make(chan []string, 100), - lockFlag: make(map[string]*Lock), - auth: auth, - } - - client.send("auth", auth) - client.send("groupid " + groupid) - client.init() - return &client, err +func (c *ZHubClient) Initx(appname, addr, groupid, auth string) error { + c.appname = appname + c.addr = addr + c.groupid = groupid + c.auth = auth + return c.Init() } -func (c *Client) reconn() (err error) { +// Init 创建一个客户端 +func (c *ZHubClient) Init( /*appname, addr, groupid, auth string*/ ) error { + conn, err := net.Dial("tcp", c.addr) + if err != nil { + return err + } + + c.conn = conn + c.wlock = sync.Mutex{} + c.rlock = sync.Mutex{} + c.createTime = time.Now() + + c.subFun = make(map[string]func(v string)) + c.timerFun = make(map[string]func()) + c.chSend = make(chan []string, 100) + c.chReceive = make(chan []string, 100) + c.timerReceive = make(chan []string, 100) + c.lockFlag = make(map[string]*Lock) + + c.send("auth", c.auth) + c.send("groupid " + c.groupid) + c.init() + return err +} + +func (c *ZHubClient) reconn() (err error) { for n := 1; n < 10; n++ { conn, err := net.Dial("tcp", c.addr) if err != nil { @@ -102,7 +105,7 @@ func (c *Client) reconn() (err error) { return } -func (c *Client) init() { +func (c *ZHubClient) init() { // 消费 topic 消息 go func() { for { @@ -130,7 +133,7 @@ func (c *Client) init() { } // Subscribe subscribe topic -func (c *Client) Subscribe(topic string, fun func(v string)) { +func (c *ZHubClient) Subscribe(topic string, fun func(v string)) { c.send("subscribe " + topic) if fun != nil { c.wlock.Lock() @@ -139,7 +142,7 @@ func (c *Client) Subscribe(topic string, fun func(v string)) { } } -func (c *Client) Unsubscribe(topic string) { +func (c *ZHubClient) Unsubscribe(topic string) { c.send("unsubscribe " + topic) delete(c.subFun, topic) } @@ -149,32 +152,32 @@ func (c *Client) Unsubscribe(topic string) { ping --- */ -func (c *Client) ping() { +func (c *ZHubClient) ping() { c.send("ping") } // Publish -------------------------------------- pub-sub -------------------------------------- -func (c *Client) Publish(topic string, message string) error { +func (c *ZHubClient) Publish(topic string, message string) error { return c.send("publish", topic, message) } -func (c *Client) Broadcast(topic string, message string) error { +func (c *ZHubClient) Broadcast(topic string, message string) error { return c.send("broadcast", topic, message) } -func (c *Client) Delay(topic string, message string, delay int) error { +func (c *ZHubClient) Delay(topic string, message string, delay int) error { return c.send("delay", topic, message, strconv.Itoa(delay)) } /* Timer - func (c *Client) Timer(topic string, expr string, fun func()) { + func (c *ZHubClient) Timer(topic string, expr string, fun func()) { c.timerFun[topic] = fun c.send("timer", topic, expr, "x") } */ -func (c *Client) Timer(topic string, fun func()) { +func (c *ZHubClient) Timer(topic string, fun func()) { if fun != nil { c.timerFun[topic] = fun } @@ -182,7 +185,7 @@ func (c *Client) Timer(topic string, fun func()) { } // Cmd send cmd -func (c *Client) Cmd(cmd ...string) { +func (c *ZHubClient) Cmd(cmd ...string) { if len(cmd) == 1 { c.send("cmd", cmd[0]) } else if len(cmd) > 1 { @@ -193,7 +196,7 @@ func (c *Client) Cmd(cmd ...string) { } } -func (c *Client) Close() { +func (c *ZHubClient) Close() { c.conn.Close() } @@ -207,7 +210,7 @@ func TryLock(key string, duration int) { } // Lock Key -func (c *Client) Lock(key string, duration int) Lock { +func (c *ZHubClient) Lock(key string, duration int) Lock { uuid := uuid.New() c.send("uuid", key, uuid, strconv.Itoa(duration)) @@ -230,7 +233,7 @@ func (c *Client) Lock(key string, duration int) Lock { return Lock{Key: key, Value: uuid} } -func (c *Client) Unlock(l Lock) { +func (c *ZHubClient) Unlock(l Lock) { c.send("unlock", l.Key, l.Value) delete(c.lockFlag, l.Value) } @@ -239,7 +242,7 @@ func (c *Client) Unlock(l Lock) { var rpcMap = make(map[string]*Rpc) var rpcLock = sync.RWMutex{} -func (c *Client) rpcInit() { +func (c *ZHubClient) rpcInit() { // 添加 appname 主题订阅处理 c.Subscribe(c.appname, func(v string) { @@ -286,7 +289,7 @@ func (r Rpc) backTopic() string { return strings.Split(r.Ruk, "::")[0] } -func (c Client) Rpc(topic string, message string, back func(res RpcResult)) { +func (c ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) { rpc := Rpc{ Ruk: c.appname + "::" + uuid.New(), Topic: topic, @@ -320,7 +323,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) { } // RpcSubscribe rpc subscribe -func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { +func (c ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { c.Subscribe(topic, func(v string) { rpc := Rpc{} err := json.Unmarshal([]byte(v), &rpc) @@ -333,7 +336,7 @@ func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { result.Ruk = rpc.Ruk res, _ := json.Marshal(result) - c.Publish(rpc.backTopic(), string(res[:])) + c.Publish(rpc.backTopic(), string(res)) }) } @@ -344,7 +347,7 @@ send socket message : if len(vs) equal 1 will send message `vs[0] + "\r\n"` else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...` */ -func (c *Client) send(vs ...string) (err error) { +func (c *ZHubClient) send(vs ...string) (err error) { //chSend <- vs c.wlock.Lock() defer c.wlock.Unlock() @@ -362,13 +365,19 @@ a: if err != nil { log.Println(err) time.Sleep(time.Second * 3) + // check conn reconnect + { + c.wlock.Unlock() + c.reconn() + c.wlock.Lock() + } goto a } return err } -func (c *Client) receive() { +func (c *ZHubClient) receive() { r := bufio.NewReader(c.conn) for { v, _, err := r.ReadLine() @@ -384,6 +393,7 @@ func (c *Client) receive() { if string(v) == "+ping" { c.send("+pong") } + log.Println("receive:", string(v)) case '-': log.Println("error:", string(v)) case '*': @@ -395,7 +405,8 @@ func (c *Client) receive() { continue } var vs []string - for i := 0; i < n; i++ { + //for i := 0; i < n; i++ { + for len(vs) < n { line, _, err := r.ReadLine() if err != nil || line == nil { continue @@ -415,16 +426,20 @@ func (c *Client) receive() { } vs = append(vs, string(buf)) } - if len(vs) == 3 && vs[0] == "message" && vs[1] == "lock" { - go func() { - log.Println("lock:" + vs[2]) - c.wlock.Lock() - defer c.wlock.Unlock() - if c.lockFlag[vs[2]] == nil { - return - } - c.lockFlag[vs[2]].flagChan <- 0 - }() + if len(vs) == 3 && vs[0] == "message" { + if vs[1] == "lock" { + go func() { + log.Println("lock:" + vs[2]) + c.wlock.Lock() + defer c.wlock.Unlock() + if c.lockFlag[vs[2]] == nil { + return + } + c.lockFlag[vs[2]].flagChan <- 0 + }() + } else { + c.chReceive <- vs + } continue } if len(vs) == 2 && vs[0] == "timer" { diff --git a/cmd/client_test.go b/cmd/client_test.go index d61427b..2c95958 100644 --- a/cmd/client_test.go +++ b/cmd/client_test.go @@ -1,110 +1,45 @@ package cmd import ( - "encoding/base64" "encoding/json" "fmt" "log" "strconv" "strings" - "sync/atomic" + "sync" "testing" "time" ) -var zhub *Client - -var ( - addr = "127.0.0.1:1216" -) +var hub *ZHubClient +var once = sync.Once{} func init() { - client, err := Create("zhub-cli", addr, "C-0", "admin@123456") - if err != nil { - log.Fatal(err) - } - zhub = client -} + once.Do(func() { + hub := &ZHubClient{ + appname: "hub-cli", + addr: "127.0.0.1:1216", + groupid: "C-0", + auth: "admin@123456", + } + err := hub.Init() -func newClient(appname, groupid string) *Client { - client, err := Create(appname, addr, groupid, "admin@123456") - if err != nil { - log.Fatal(err) - } - return client -} - -func TestTimer(t *testing.T) { - go func() { - client := newClient("zhub-cli", "g-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-1-ax: " + v) - }) - }() - go func() { - client := newClient("zhub-cli", "g-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-2-ax: " + v) - }) - }() - - go func() { - client := newClient("zhub-cli", "g-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-3-ax: " + v) - }) - }() - - time.Sleep(time.Hour * 3) -} - -func TestSendCmd(t *testing.T) { - client := newClient("zhub-cli", "group-admin") - - //client.Cmd("reload-timer") - client.Cmd("shutdown") -} - -func TestPublish(t *testing.T) { - - /*zhub.Publish("abx", "asd\r\nxxx1") - zhub.Publish("abx", "asd\r\nxxx2") - zhub.Publish("abx", "asd\r\nxxx3") - zhub.Publish("abx", "asd\r\nxxx4") - zhub.Publish("abx", "asd\r\nxxx5")*/ - - /*for i := 0; i < 10000; i++ { - zhub.Publish("ax1", strconv.Itoa(i)) - }*/ - - /*for i := 0; i < 20_0000; i++ { - time.Sleep(1 * time.Millisecond) - zhub.Publish("b", strconv.Itoa(i)) - }*/ - - /*zhub.Subscribe("wx:user-follow", func(v string) { - fmt.Println(v) - })*/ - - //hub.Publish("ax", "1") - - time.Sleep(time.Second) + if err != nil { + log.Fatal(err) + } + hub = hub + }) } func TestLock(t *testing.T) { - client, _ := Create("zhub-cli", addr, "xx", "admin@123456") - - client.Subscribe("lock", func(v string) { + hub.Subscribe("lock", func(v string) { }) var fun = func(x string) { log.Println("lock", time.Now().UnixNano()/1e6) - lock := client.Lock("a", 30) - defer client.Unlock(lock) + lock := hub.Lock("a", 30) + defer hub.Unlock(lock) //client.Lock("a", 5) for i := 0; i < 5; i++ { @@ -124,6 +59,8 @@ func rotate(nums []int, k int) { k = k % len(nums) nums = append(nums[len(nums)-k:], nums[0:len(nums)-k]...) } + +// 特殊符号测试 func TestName(t *testing.T) { //str := ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}" /*str := "别人家的女娃子🤞🏻" @@ -150,83 +87,7 @@ func TestName(t *testing.T) { func toStr(d interface{}) string { bs, _ := json.Marshal(d) - return string(bs[:]) -} - -// 接收数据 A -func TestC_a(t *testing.T) { - zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456") - - if err != nil { - log.Fatal(err) - } - - zhub.Subscribe("cmt:user-msg", func(v string) { - fmt.Println(v) - }) - - time.Sleep(10 * time.Hour) -} - -// 接收数据 -func TestC_ab(t *testing.T) { - zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456") - - if err != nil { - log.Fatal(err) - } - - zhub.Subscribe("a", func(v string) { - fmt.Println("a:", v) - }) - zhub.Subscribe("b", func(v string) { - fmt.Println("b:", v) - }) - zhub.Subscribe("im:friend:186", func(v string) { - fmt.Println("im:friend:186:", v) - }) - - time.Sleep(1 * time.Hour) -} - -func TestDelay2(t *testing.T) { - zhub, err := Create("zhub-cli", addr, "C-1", "admin@123456") - if err != nil { - log.Fatal(err) - } - - var x int64 = 0 - go func() { - zhub.Subscribe("a", func(v string) { - fmt.Println(v, "-", atomic.AddInt64(&x, 1)) - }) - }() - - zhub2, err := Create("zhub-cli", addr, "C-1", "admin@123456") - if err != nil { - log.Fatal(err) - } - - go func() { - zhub2.Subscribe("a", func(v string) { - fmt.Println(v, "-", atomic.AddInt64(&x, 1)) - }) - }() - - time.Sleep(time.Second * 20000) -} - -func TestDelay(t *testing.T) { - //zhub.Delay("abx", "1", -1) - - //zhub.Publish("yk-topic", "hello yk.") - - for i := 0; i < 1000; i++ { - zhub.Publish("a", "x-"+strconv.Itoa(i)) - - } - - time.Sleep(time.Second * 5) + return string(bs) } // 测试发送微信 模板消息 @@ -267,12 +128,12 @@ func TestWxSendMessage(t *testing.T) { log.Println(toStr(tplData["templateData"])) - zhub.Publish("wx:send-template-message", toStr(tplData)) + hub.Publish("wx:send-template-message", toStr(tplData)) } // 监听各项目所有接口请求失败信息《接口请求失败,发送失败信息到 pro.app-error 主题》 func TestAppErrorConsole(t *testing.T) { - zhub.Subscribe("app-error", func(v string) { + hub.Subscribe("app-error", func(v string) { strs := []string{ "未登陆", "账号已在其他设备登录", "今日已签到", "您的登录状态已过期", "用户不存在", "暂无可预定场馆", } @@ -291,8 +152,7 @@ func TestAppErrorConsole(t *testing.T) { // ----------- rpc test ----------- func TestRpcCall_S(t *testing.T) { - zhub := newClient("zhub-cli-a", "x") - zhub.RpcSubscribe("ai-test", func(r Rpc) RpcResult { + hub.RpcSubscribe("ai-test", func(r Rpc) RpcResult { upper := strings.ToUpper(r.Value) return RpcResult{Retcode: 0, Retinfo: "操作成功", Result: upper} }) @@ -315,7 +175,7 @@ func TestRpcCall_C(t *testing.T) { print(res.Result) })*/ - zhub.Subscribe("zcore:monitor-error", func(v string) { + hub.Subscribe("zcore:monitor-error", func(v string) { fmt.Println(v) }) @@ -325,13 +185,3 @@ func TestRpcCall_C(t *testing.T) { fmt.Println(res) })*/ } - -func TestBannedTalk(t *testing.T) { - /*zhub.Rpc("im:banned-talk", "{'imtoken':'74074f9e599947ca940e71a9788e768f'}", func(res RpcResult) { - fmt.Print(res) - })*/ - - encoding := base64.Encoding{} - toString := encoding.EncodeToString([]byte("420101190001011234")) - fmt.Println(toString) -} diff --git a/internal/zbus/zbus-message-handler.go b/internal/zbus/zbus-message-handler.go index af2e17c..18fb1c6 100644 --- a/internal/zbus/zbus-message-handler.go +++ b/internal/zbus/zbus-message-handler.go @@ -98,7 +98,7 @@ func messageHandler(v Message) { switch cmd { case "auth": userid, err := AuthManager.GetUserIdByToken(rcmd[1]) - if err != nil { + if err != nil && Conf.Service.Auth { c.send("-Error: " + err.Error()) return } diff --git a/main.go b/main.go index 028064c..cd4763e 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,10 @@ func main() { return } - cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接 + cli := cmd.ZHubClient{} + err = cli.Initx("server-local", addr, "server-admin", adminToken) + + // cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接 if err != nil { log.Println(err) // 如果连接失败则打印错误信息 return