From 3c66a41de2b0dd1d6d09ba5ed5f2de19591f9d3e Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 21 Jan 2021 11:05:49 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E5=B9=BF=E6=92=AD?= =?UTF-8?q?=E6=B6=88=E6=81=AF=EF=BC=9B=20=E4=BC=98=E5=8C=96=EF=BC=9A?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=9C=8D=E5=8A=A1=E6=B6=88=E6=81=AF=E5=8F=91?= =?UTF-8?q?=E9=80=81=E4=BD=BF=E7=94=A8=E8=AF=BB=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@76 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli/client.go | 54 ++++++++++++++++++++------------------------ cli_test.go | 22 +++++++++++++----- zsub/msg-consumer.go | 2 ++ zsub/zsub.go | 27 ++++++++++++++++++---- 4 files changed, 66 insertions(+), 39 deletions(-) diff --git a/cli/client.go b/cli/client.go index d69d867..af9757d 100644 --- a/cli/client.go +++ b/cli/client.go @@ -50,7 +50,7 @@ func Create(addr string, groupid string) (*Client, error) { timerReceive: make(chan []string, 100), } - conn.Write([]byte("groupid " + groupid + "\r\n")) + client.send("groupid " + groupid) client.init() return &client, err } @@ -64,15 +64,15 @@ func (c *Client) reconn() (err error) { continue } else if err == nil { c.conn = conn - conn.Write([]byte("groupid " + c.groupid + "\r\n")) + c.send("groupid " + c.groupid) go c.receive() // 重新订阅 for topic, _ := range c.subFun { - c.subscribes(topic) + c.Subscribe(topic, nil) } for topic, _ := range c.timerFun { - c.timer(topic) + c.Timer(topic, nil) } break } @@ -100,16 +100,23 @@ func (c *Client) init() { } fun() } - } }() go c.receive() } +/* +// subscribe topic +--- +subscribe x y z +--- +*/ func (c *Client) Subscribe(topic string, fun func(v string)) { - c.subFun[topic] = fun - c.subscribes(topic) + c.send("subscribe " + topic) + if fun != nil { + c.subFun[topic] = fun + } } /* @@ -135,13 +142,15 @@ $24 --- */ func (c *Client) Publish(topic string, message string) error { - c.send("publish", topic, message) - return nil + return c.send("publish", topic, message) +} + +func (c *Client) Broadcast(topic string, message string) error { + return c.send("broadcast", topic, message) } func (c *Client) Daly(topic string, message string, daly int) error { - c.send("daly", topic, message, strconv.Itoa(daly)) - return nil + return c.send("daly", topic, message, strconv.Itoa(daly)) } /*func (c *Client) Timer(topic string, expr string, fun func()) { @@ -149,16 +158,9 @@ func (c *Client) Daly(topic string, message string, daly int) error { c.send("timer", topic, expr, "x") }*/ func (c *Client) Timer(topic string, fun func()) { - c.timerFun[topic] = fun - c.send("timer", topic) -} -func (c *Client) TimerSingle(topic string, expr string, fun func()) { - c.timerFun[topic] = fun - c.send("timer", topic, expr, "a") -} - -// todo: save client timer‘s info -func (c *Client) timer(topic string) { + if fun != nil { + c.timerFun[topic] = fun + } c.send("timer", topic) } @@ -171,13 +173,7 @@ func (c *Client) Close() { c.conn.Close() } -/* -// subscribe topic ---- -subscribe x y z ---- -*/ -func (c *Client) subscribes(topics ...string) error { +/*func (c *Client) subscribes(topics ...string) error { if len(topics) == 0 { return nil } @@ -188,7 +184,7 @@ func (c *Client) subscribes(topics ...string) error { } c.send(messages) return nil -} +}*/ /* send socket message : diff --git a/cli_test.go b/cli_test.go index 08d4ffe..d28820f 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,6 +2,7 @@ package main import ( "log" + "strconv" "testing" "time" "zhub/cli" @@ -40,14 +41,22 @@ func TestCli(t *testing.T) { } func TestTimer(t *testing.T) { + go func() { + client, _ := cli.Create(addr, "topic-2") + + client.Subscribe("ax", func(v string) { + log.Println("topic-1-ax: " + v) + }) + }() go func() { client, _ := cli.Create(addr, "topic-1") - client.Timer("a", func() { - log.Println("client-1 收到 a 的定时消息") + + client.Subscribe("ax", func(v string) { + log.Println("topic-2-ax: " + v) }) }() - go func() { + /*go func() { client, _ := cli.Create(addr, "topic-2") client.Timer("a", func() { log.Println("client-2 收到 a 的定时消息") @@ -69,7 +78,7 @@ func TestTimer(t *testing.T) { client.Timer("VIP-EXP-EXPIRE", func() { log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息") }) - }() + }()*/ time.Sleep(time.Hour * 3) } @@ -88,8 +97,9 @@ func TestPublish(t *testing.T) { if err != nil { log.Println(err) } - - client.Publish("ax", "a") + for i := 0; i < 30_0000; i++ { + client.Publish("ax", strconv.Itoa(i)) + } time.Sleep(time.Second) } diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 402b1d5..50a67f8 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -67,6 +67,8 @@ func msgAccept(v Message) { } else { zsub.publish(rcmd[1], rcmd[2]) } + case "broadcast": + zsub.broadcast(rcmd[1], rcmd[2]) case "daly": daly(rcmd, c) case "timer": diff --git a/zsub/zsub.go b/zsub/zsub.go index 8edfb11..256ea28 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -17,7 +17,7 @@ var ( ) type ZSub struct { - sync.Mutex + sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer } @@ -99,9 +99,9 @@ accept topic message 1、send message to topic's chan 2、feedback send success to sender, and sending message to topic's subscripts */ -func (s *ZSub) publish(topic string, msg string) { - s.Lock() - defer s.Unlock() +func (s *ZSub) publish(topic, msg string) { + s.RLock() + defer s.RUnlock() ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -110,6 +110,25 @@ func (s *ZSub) publish(topic string, msg string) { ztopic.mcount++ } +/* +send broadcast message +*/ +func (s *ZSub) broadcast(topic, msg string) { + s.RLock() + defer s.RUnlock() + + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + + for _, group := range ztopic.groups { + for _, conn := range group.conns { + conn.send("message", topic, msg) + } + } +} + func (s *ZSub) close(c *ZConn) { // sub for _, topic := range c.topics {