From d5ffd34b8b18dfebfc6f2a2dff02c612c22f9ca5 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 22 Feb 2021 11:14:03 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=20map=20=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@108 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/zgroup.go | 9 +++++++-- zsub/zsub.go | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/zsub/zgroup.go b/zsub/zgroup.go index d6f9b8a..d992161 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -14,6 +14,8 @@ type ZGroup struct { // ZGroup } func (g *ZGroup) appendTo(c *ZConn) { + c.Lock() + defer c.Unlock() topic := g.ztopic.topic // report subscribe topic check @@ -22,7 +24,8 @@ func (g *ZGroup) appendTo(c *ZConn) { } // create new goroutine consumer message - c.substoped[topic] = make(chan int, 0) + unsubChan := make(chan int, 0) + c.substoped[topic] = unsubChan c.appendTo(g.conns) go func() { for { @@ -40,7 +43,9 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return - case <-c.substoped[topic]: + case <-unsubChan: + c.Lock() + defer c.Unlock() delete(c.substoped, topic) return } diff --git a/zsub/zsub.go b/zsub/zsub.go index 606828c..1387741 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -90,8 +90,8 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} 取消订阅: */ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} - s.Lock() - defer s.Unlock() + c.Lock() + defer c.Unlock() close(c.substoped[topic]) ztopic := s.topics[topic] //ZTopic if ztopic == nil {