diff --git a/zsub/zgroup.go b/zsub/zgroup.go index d6f9b8a..d992161 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -14,6 +14,8 @@ type ZGroup struct { // ZGroup } func (g *ZGroup) appendTo(c *ZConn) { + c.Lock() + defer c.Unlock() topic := g.ztopic.topic // report subscribe topic check @@ -22,7 +24,8 @@ func (g *ZGroup) appendTo(c *ZConn) { } // create new goroutine consumer message - c.substoped[topic] = make(chan int, 0) + unsubChan := make(chan int, 0) + c.substoped[topic] = unsubChan c.appendTo(g.conns) go func() { for { @@ -40,7 +43,9 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return - case <-c.substoped[topic]: + case <-unsubChan: + c.Lock() + defer c.Unlock() delete(c.substoped, topic) return } diff --git a/zsub/zsub.go b/zsub/zsub.go index 606828c..1387741 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -90,8 +90,8 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} 取消订阅: */ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} - s.Lock() - defer s.Unlock() + c.Lock() + defer c.Unlock() close(c.substoped[topic]) ztopic := s.topics[topic] //ZTopic if ztopic == nil {