From d6dda9e968881a3f06d91082eb37c9ed80ee3886 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Fri, 29 Jan 2021 06:14:17 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=96=AD=E5=BC=80=E5=90=8E=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E6=90=BA=E7=A8=8B=E6=9C=AA=E5=9B=9E=E6=94=B6bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@94 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli_test.go | 19 +++++++++---------- zsub/zgroup.go | 22 +++++++++++++--------- zsub/zsub.go | 18 +++++++++++++----- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/cli_test.go b/cli_test.go index 5ba7e71..f2da3e7 100644 --- a/cli_test.go +++ b/cli_test.go @@ -10,9 +10,9 @@ import ( var ( //addr = "47.111.150.118:6066" - //addr = "127.0.0.1:1216" + addr = "127.0.0.1:1216" //addr = "122.112.180.156:6066" - addr = "39.108.56.246:1216" + //addr = "39.108.56.246:1216" ) func TestCli(t *testing.T) { @@ -45,14 +45,14 @@ func TestTimer(t *testing.T) { go func() { client, _ := cli.Create(addr, "topic-1") - client.Subscribe("ax", func(v string) { + client.Subscribe("ax1", func(v string) { log.Println("topic-1-ax: " + v) }) }() go func() { client, _ := cli.Create(addr, "topic-1") - client.Subscribe("ax", func(v string) { + client.Subscribe("ax1", func(v string) { log.Println("topic-2-ax: " + v) }) }() @@ -60,12 +60,12 @@ func TestTimer(t *testing.T) { go func() { client, _ := cli.Create(addr, "topic-1") - client.Subscribe("ax", func(v string) { + client.Subscribe("ax1", func(v string) { log.Println("topic-3-ax: " + v) }) }() - go func() { + /*go func() { client, _ := cli.Create(addr, "topic-1") client.Subscribe("ax", func(v string) { @@ -78,7 +78,7 @@ func TestTimer(t *testing.T) { client.Subscribe("ax", func(v string) { log.Println("topic-5-ax: " + v) }) - }() + }()*/ /*go func() { client, _ := cli.Create(addr, "topic-2") @@ -121,9 +121,8 @@ func TestPublish(t *testing.T) { if err != nil { log.Println(err) } - for i := 0; i < 100_0000; i++ { - client.Publish("ax", strconv.Itoa(i)) - //time.Sleep(time.Second) + for i := 0; i < 10000; i++ { + client.Publish("ax1", strconv.Itoa(i)) } time.Sleep(time.Second) diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 0b3ea2d..1a348f9 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -17,17 +17,21 @@ func (g *ZGroup) appendTo(c *ZConn) { c.appendTo(g.conns) go func() { // 每个连接开启一个携程发送数据 for { - msg, ok := <-g.chMsg - if !ok { - break - } + select { + case msg, ok := <-g.chMsg: + if !ok { + return + } - err := c.send("message", g.ztopic.topic, msg) - if err != nil { // 失败处理 - g.chMsg <- msg - break + err := c.send("message", g.ztopic.topic, msg) + if err != nil { // 失败处理 + g.chMsg <- msg + return + } + atomic.AddInt32(&g.offset, 1) + case <-c.stoped: + return } - atomic.AddInt32(&g.offset, 1) } }() } diff --git a/zsub/zsub.go b/zsub/zsub.go index e1bea5c..eeb6da3 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -28,6 +28,16 @@ type ZConn struct { //ZConn groupid string topics []string timers []string // 订阅、定时调度分别创建各自连接 + stoped chan int // 关闭信号量 +} + +func NewZConn(conn *net.Conn) *ZConn { + return &ZConn{ + conn: conn, + topics: []string{}, + timers: []string{}, + stoped: make(chan int, 0), + } } /* @@ -130,6 +140,7 @@ func (s *ZSub) broadcast(topic, msg string) { } func (s *ZSub) close(c *ZConn) { + close(c.stoped) // sub for _, topic := range c.topics { s.unsubscribe(c, topic) @@ -205,11 +216,8 @@ func ServerStart(addr string) { } log.Println("conn start: ", conn.RemoteAddr()) - go zsub.acceptHandler(&ZConn{ - conn: &conn, - topics: []string{}, - timers: []string{}, - }) + zConn := NewZConn(&conn) + go zsub.acceptHandler(zConn) } }