From e9aa5cf615217343ea9abe1043d492431353424f Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Tue, 22 Mar 2022 01:57:13 +0000 Subject: [PATCH] . git-svn-id: svn://47.119.165.148/zhub@147 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cmd/client.go | 29 +++++++++++++++-------------- zsub/msg-consumer.go | 3 ++- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cmd/client.go b/cmd/client.go index 97249bc..c9ff822 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -18,8 +18,8 @@ import ( ) type Client struct { - lock sync.Mutex // write lock - //rlock sync.Mutex // read lock + wlock sync.Mutex // write lock + rlock sync.Mutex // read lock appname string // local appname addr string // host:port @@ -51,7 +51,8 @@ func Create(appname string, addr string, groupid string) (*Client, error) { } client := Client{ - lock: sync.Mutex{}, + wlock: sync.Mutex{}, + rlock: sync.Mutex{}, appname: appname, addr: addr, conn: conn, @@ -132,8 +133,8 @@ subscribe x y z func (c *Client) Subscribe(topic string, fun func(v string)) { c.send("subscribe " + topic) if fun != nil { - c.lock.Lock() - defer c.lock.Unlock() + c.wlock.Lock() + defer c.wlock.Unlock() c.subFun[topic] = fun } } @@ -213,8 +214,8 @@ func (c *Client) Lock(key string, duration int) Lock { lockChan := make(chan int, 2) go func() { - c.lock.Lock() - defer c.lock.Unlock() + c.wlock.Lock() + defer c.wlock.Unlock() c.lockFlag[uuid] = &Lock{ Key: key, Uuid: uuid, @@ -359,8 +360,8 @@ else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ " */ func (c *Client) send(vs ...string) (err error) { //chSend <- vs - c.lock.Lock() - defer c.lock.Unlock() + c.wlock.Lock() + defer c.wlock.Unlock() a: if len(vs) == 1 { _, err = c.conn.Write([]byte(vs[0] + "\r\n")) @@ -382,8 +383,8 @@ a: } func (c *Client) receive() { - c.lock.Lock() - defer c.lock.Unlock() + c.rlock.Lock() + defer c.rlock.Unlock() r := bufio.NewReader(c.conn) for { @@ -426,8 +427,8 @@ func (c *Client) receive() { if strings.EqualFold(vs[1], "lock") { // message lock Uuid go func() { log.Println("lock:" + vs[2]) - c.lock.Lock() - defer c.lock.Unlock() + c.wlock.Lock() + defer c.wlock.Unlock() if c.lockFlag[vs[2]] == nil { return @@ -451,7 +452,7 @@ func (c *Client) receive() { continue case "+": // +pong, +xxx - if strings.EqualFold("+ping", string(v)) { + if strings.EqualFold("+ping", string(v)) { // 心跳消息回复 c.send("+pong") } case "-": diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index d5ee320..7312f6a 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -118,7 +118,8 @@ func msgAccept(v Message) { zsub.delay(rcmd, c) case "timer": for _, name := range rcmd[1:] { - zsub.timer([]string{"timer", name}, c) + zsub.timer([]string{"timer", name}, c) // append to timers + c.timers = append(c.timers, name) // append to conns } case "cmd": if len(rcmd) == 1 {