From ad3f7686ae9c1af22c1bed91a40ea1ec511eafac Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Wed, 3 Feb 2021 08:01:57 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E8=AE=A2=E9=98=85=E6=97=A0=E6=95=88=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@98 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli/client.go | 5 +++++ pkg.bat | 7 +++++-- zsub/msg-consumer.go | 4 ++-- zsub/zgroup.go | 2 ++ zsub/zsub.go | 27 ++++++++++++++++++--------- 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/cli/client.go b/cli/client.go index e245e69..38f0302 100644 --- a/cli/client.go +++ b/cli/client.go @@ -119,6 +119,11 @@ func (c *Client) Subscribe(topic string, fun func(v string)) { } } +func (c *Client) Unsubscribe(topic string) { + c.send("unsubscribe " + topic) + delete(c.subFun, topic) +} + /* --- ping diff --git a/pkg.bat b/pkg.bat index f3d8cc3..6d4882c 100644 --- a/pkg.bat +++ b/pkg.bat @@ -1,7 +1,10 @@ SET GOOS=linux SET GOARCH=amd64 -go build -o zhub.sh -ldflags "-s -w" ./main.go +go build -o zhub.sh -ldflags "-s -w" ./app.go upx -9 zhub.sh -scp zhub.sh zhost: +scp zhub.sh xhost:/opt/zhub +scp zhub.sh zhost:/opt/zhub +scp zhub.sh qhost:/opt/zhub +scp zhub.sh nhost:/opt/zhub del zhub.sh diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index e82384e..89d5331 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -106,7 +106,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { if timer != nil { if t == -1 { timer.Stop() - s.delays[rcmd[1]+"-"+rcmd[2]] = nil + delete(s.delays, rcmd[1]+"-"+rcmd[2]) return } timer.Reset(time.Duration(t) * time.Millisecond) @@ -117,7 +117,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { select { case <-timer.C: zsub.publish(rcmd[1], rcmd[2]) - s.delays[rcmd[1]+"-"+rcmd[2]] = nil + delete(s.delays, rcmd[1]+"-"+rcmd[2]) } }() } diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 1a348f9..7e6e701 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -31,6 +31,8 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return + case <-c.substoped[g.ztopic.topic]: + return } } }() diff --git a/zsub/zsub.go b/zsub/zsub.go index c34833c..eb2e362 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -27,19 +27,21 @@ type ZSub struct { type ZConn struct { //ZConn sync.Mutex - conn *net.Conn - groupid string - topics []string - timers []string // 订阅、定时调度分别创建各自连接 - stoped chan int // 关闭信号量 + conn *net.Conn + groupid string + topics []string + timers []string // 订阅、定时调度分别创建各自连接 + stoped chan int // 关闭信号量 + substoped map[string]chan int // 关闭信号量 } func NewZConn(conn *net.Conn) *ZConn { return &ZConn{ - conn: conn, - topics: []string{}, - timers: []string{}, - stoped: make(chan int, 0), + conn: conn, + topics: []string{}, + timers: []string{}, + stoped: make(chan int, 0), + substoped: make(map[string]chan int), } } @@ -74,6 +76,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} } //zgroup.conns = c.appendTo(zgroup.conns) + c.substoped[topic] = make(chan int, 0) zgroup.appendTo(c) for i, item := range c.topics { @@ -90,6 +93,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} s.Lock() defer s.Unlock() + close(c.substoped[topic]) ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -226,6 +230,11 @@ func ServerStart(addr string) { // 连接处理 func (s *ZSub) acceptHandler(c *ZConn) { + defer func() { + if r := recover(); r != nil { + log.Println("acceptHandler Recovered:", r) + } + }() defer func() { s.close(c) // close ZConn }()