From 03a7118598beca7563331c0cafc1ace3efe58991 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Tue, 19 Jan 2021 01:41:52 +0000 Subject: [PATCH] . git-svn-id: svn://47.119.165.148/zhub@74 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/msg-consumer.go | 22 +++++++++------------- zsub/zgroup.go | 2 +- zsub/zsub.go | 1 + zsub/ztimer.go | 2 +- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index b053a69..402b1d5 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -2,10 +2,8 @@ package zsub import ( "log" - "net" "strconv" "strings" - "sync" "time" "zhub/conf" ) @@ -44,7 +42,7 @@ func msgAccept(v Message) { if startWithAny(rcmd[0], arr...) { rcmd = strings.Split(rcmd[0], " ") } else { - send(c.conn, "-Error: not supported:"+rcmd[0]) + c.send("-Error: not supported:" + rcmd[0]) return } } @@ -65,7 +63,7 @@ func msgAccept(v Message) { } case "publish": if len(rcmd) != 3 { - send(c.conn, "-Error: publish para number!") + c.send("-Error: publish para number!") } else { zsub.publish(rcmd[1], rcmd[2]) } @@ -82,7 +80,7 @@ func msgAccept(v Message) { zsub.reloadTimerConfig() } default: - send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") + c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") return } } @@ -90,13 +88,13 @@ func msgAccept(v Message) { // daly topic value 100 -> publish topic value func daly(rcmd []string, c *ZConn) { if len(rcmd) != 4 { - send(c.conn, "-Error: subscribe para number!") + c.send("-Error: subscribe para number!") return } t, err := strconv.ParseInt(rcmd[3], 10, 64) if err != nil { - send(c.conn, "-Error: "+strings.Join(rcmd, " ")) + c.send("-Error: " + strings.Join(rcmd, " ")) return } @@ -107,12 +105,10 @@ func daly(rcmd []string, c *ZConn) { } } -var wlock = sync.Mutex{} - // send message -func send(conn *net.Conn, vs ...string) error { - wlock.Lock() - defer wlock.Unlock() +func (c *ZConn) send(vs ...string) error { + c.Lock() + defer c.Unlock() var bytes []byte @@ -126,6 +122,6 @@ func send(conn *net.Conn, vs ...string) error { } bytes = []byte(data) } - _, err := (*conn).Write(bytes) + _, err := (*c.conn).Write(bytes) return err } diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 41deee3..289cc8f 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -21,7 +21,7 @@ func (g *ZGroup) init() { if len(g.conns) == 0 { continue } - send(g.conns[0].conn, "message", g.ztopic.topic, msg) + g.conns[0].send("message", g.ztopic.topic, msg) g.offset++ } }() diff --git a/zsub/zsub.go b/zsub/zsub.go index 17697ec..8edfb11 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -23,6 +23,7 @@ type ZSub struct { } type ZConn struct { //ZConn + sync.Mutex conn *net.Conn groupid string topics []string diff --git a/zsub/ztimer.go b/zsub/ztimer.go index bfcf7b7..f5b2798 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -54,7 +54,7 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { var timerFun = func() { for _, conn := range timer.conns { - err := send(conn.conn, "timer", timer.topic) + err := conn.send("timer", timer.topic) if timer.single && err == nil { break }