diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 7e6e701..a6307b3 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -14,8 +14,16 @@ type ZGroup struct { // ZGroup } func (g *ZGroup) appendTo(c *ZConn) { + topic := g.ztopic.topic + + // report subscribe topic check + if c.substoped[topic] != nil { + return + } + + c.substoped[topic] = make(chan int, 0) c.appendTo(g.conns) - go func() { // 每个连接开启一个携程发送数据 + go func() { // create new goroutine consumer message for { select { case msg, ok := <-g.chMsg: @@ -23,7 +31,7 @@ func (g *ZGroup) appendTo(c *ZConn) { return } - err := c.send("message", g.ztopic.topic, msg) + err := c.send("message", topic, msg) if err != nil { // 失败处理 g.chMsg <- msg return @@ -31,7 +39,8 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return - case <-c.substoped[g.ztopic.topic]: + case <-c.substoped[topic]: + delete(c.substoped, topic) return } } diff --git a/zsub/zsub.go b/zsub/zsub.go index eb2e362..d65c665 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -75,8 +75,6 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} ztopic.groups[c.groupid] = zgroup } - //zgroup.conns = c.appendTo(zgroup.conns) - c.substoped[topic] = make(chan int, 0) zgroup.appendTo(c) for i, item := range c.topics {