From 7746b768bdc48aeddcf22ed47e0107b864de758a Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 28 Jan 2021 12:11:35 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Azgroup=20=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E7=BB=84=20=E5=85=B1=E5=90=8C=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=BB=84=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@92 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli_test.go | 33 +++++++++++++++++++++++++++++---- zsub/zgroup.go | 21 +++++++++++++-------- zsub/zsub.go | 9 ++++++--- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/cli_test.go b/cli_test.go index d28820f..5ba7e71 100644 --- a/cli_test.go +++ b/cli_test.go @@ -10,8 +10,9 @@ import ( var ( //addr = "47.111.150.118:6066" - addr = "127.0.0.1:1216" - //addr = "39.108.56.246:1216" + //addr = "127.0.0.1:1216" + //addr = "122.112.180.156:6066" + addr = "39.108.56.246:1216" ) func TestCli(t *testing.T) { @@ -42,7 +43,7 @@ func TestCli(t *testing.T) { func TestTimer(t *testing.T) { go func() { - client, _ := cli.Create(addr, "topic-2") + client, _ := cli.Create(addr, "topic-1") client.Subscribe("ax", func(v string) { log.Println("topic-1-ax: " + v) @@ -56,6 +57,29 @@ func TestTimer(t *testing.T) { }) }() + go func() { + client, _ := cli.Create(addr, "topic-1") + + client.Subscribe("ax", func(v string) { + log.Println("topic-3-ax: " + v) + }) + }() + + go func() { + client, _ := cli.Create(addr, "topic-1") + + client.Subscribe("ax", func(v string) { + log.Println("topic-4-ax: " + v) + }) + }() + go func() { + client, _ := cli.Create(addr, "topic-1") + + client.Subscribe("ax", func(v string) { + log.Println("topic-5-ax: " + v) + }) + }() + /*go func() { client, _ := cli.Create(addr, "topic-2") client.Timer("a", func() { @@ -97,8 +121,9 @@ func TestPublish(t *testing.T) { if err != nil { log.Println(err) } - for i := 0; i < 30_0000; i++ { + for i := 0; i < 100_0000; i++ { client.Publish("ax", strconv.Itoa(i)) + //time.Sleep(time.Second) } time.Sleep(time.Second) diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 289cc8f..0b3ea2d 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -1,28 +1,33 @@ package zsub -import "sync" +import ( + "sync" + "sync/atomic" +) type ZGroup struct { // ZGroup sync.Mutex conns []*ZConn - offset int + offset int32 chMsg chan string // 组消息即时投递 ztopic *ZTopic // 所属topic } -func (g *ZGroup) init() { - go func() { +func (g *ZGroup) appendTo(c *ZConn) { + c.appendTo(g.conns) + go func() { // 每个连接开启一个携程发送数据 for { msg, ok := <-g.chMsg if !ok { break } - if len(g.conns) == 0 { - continue + err := c.send("message", g.ztopic.topic, msg) + if err != nil { // 失败处理 + g.chMsg <- msg + break } - g.conns[0].send("message", g.ztopic.topic, msg) - g.offset++ + atomic.AddInt32(&g.offset, 1) } }() } diff --git a/zsub/zsub.go b/zsub/zsub.go index 256ea28..e1bea5c 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -53,15 +53,15 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} zgroup := ztopic.groups[c.groupid] //ZGroup if zgroup == nil { zgroup = &ZGroup{ - conns: []*ZConn{}, + //conns: []*ZConn{}, ztopic: ztopic, chMsg: make(chan string, 1000), } - zgroup.init() ztopic.groups[c.groupid] = zgroup } - zgroup.conns = c.appendTo(zgroup.conns) + //zgroup.conns = c.appendTo(zgroup.conns) + zgroup.appendTo(c) for i, item := range c.topics { if strings.EqualFold(item, topic) { @@ -156,6 +156,9 @@ func (s *ZSub) close(c *ZConn) { } func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { + if arr == nil { + arr = make([]*ZConn, 0) + } for i, item := range arr { if item == c { arr = append(arr[:i], arr[i+1:]...)