From fb93177de7ecac691f2d666dd6ea221e635683bf Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Fri, 5 Feb 2021 03:48:58 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E4=B8=BB=E9=A2=98=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@100 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/zgroup.go | 15 ++++++++++++--- zsub/zsub.go | 2 -- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 7e6e701..a6307b3 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -14,8 +14,16 @@ type ZGroup struct { // ZGroup } func (g *ZGroup) appendTo(c *ZConn) { + topic := g.ztopic.topic + + // report subscribe topic check + if c.substoped[topic] != nil { + return + } + + c.substoped[topic] = make(chan int, 0) c.appendTo(g.conns) - go func() { // 每个连接开启一个携程发送数据 + go func() { // create new goroutine consumer message for { select { case msg, ok := <-g.chMsg: @@ -23,7 +31,7 @@ func (g *ZGroup) appendTo(c *ZConn) { return } - err := c.send("message", g.ztopic.topic, msg) + err := c.send("message", topic, msg) if err != nil { // 失败处理 g.chMsg <- msg return @@ -31,7 +39,8 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return - case <-c.substoped[g.ztopic.topic]: + case <-c.substoped[topic]: + delete(c.substoped, topic) return } } diff --git a/zsub/zsub.go b/zsub/zsub.go index eb2e362..d65c665 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -75,8 +75,6 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} ztopic.groups[c.groupid] = zgroup } - //zgroup.conns = c.appendTo(zgroup.conns) - c.substoped[topic] = make(chan int, 0) zgroup.appendTo(c) for i, item := range c.topics {