diff --git a/cli/client.go b/cli/client.go index d69d867..af9757d 100644 --- a/cli/client.go +++ b/cli/client.go @@ -50,7 +50,7 @@ func Create(addr string, groupid string) (*Client, error) { timerReceive: make(chan []string, 100), } - conn.Write([]byte("groupid " + groupid + "\r\n")) + client.send("groupid " + groupid) client.init() return &client, err } @@ -64,15 +64,15 @@ func (c *Client) reconn() (err error) { continue } else if err == nil { c.conn = conn - conn.Write([]byte("groupid " + c.groupid + "\r\n")) + c.send("groupid " + c.groupid) go c.receive() // 重新订阅 for topic, _ := range c.subFun { - c.subscribes(topic) + c.Subscribe(topic, nil) } for topic, _ := range c.timerFun { - c.timer(topic) + c.Timer(topic, nil) } break } @@ -100,16 +100,23 @@ func (c *Client) init() { } fun() } - } }() go c.receive() } +/* +// subscribe topic +--- +subscribe x y z +--- +*/ func (c *Client) Subscribe(topic string, fun func(v string)) { - c.subFun[topic] = fun - c.subscribes(topic) + c.send("subscribe " + topic) + if fun != nil { + c.subFun[topic] = fun + } } /* @@ -135,13 +142,15 @@ $24 --- */ func (c *Client) Publish(topic string, message string) error { - c.send("publish", topic, message) - return nil + return c.send("publish", topic, message) +} + +func (c *Client) Broadcast(topic string, message string) error { + return c.send("broadcast", topic, message) } func (c *Client) Daly(topic string, message string, daly int) error { - c.send("daly", topic, message, strconv.Itoa(daly)) - return nil + return c.send("daly", topic, message, strconv.Itoa(daly)) } /*func (c *Client) Timer(topic string, expr string, fun func()) { @@ -149,16 +158,9 @@ func (c *Client) Daly(topic string, message string, daly int) error { c.send("timer", topic, expr, "x") }*/ func (c *Client) Timer(topic string, fun func()) { - c.timerFun[topic] = fun - c.send("timer", topic) -} -func (c *Client) TimerSingle(topic string, expr string, fun func()) { - c.timerFun[topic] = fun - c.send("timer", topic, expr, "a") -} - -// todo: save client timer‘s info -func (c *Client) timer(topic string) { + if fun != nil { + c.timerFun[topic] = fun + } c.send("timer", topic) } @@ -171,13 +173,7 @@ func (c *Client) Close() { c.conn.Close() } -/* -// subscribe topic ---- -subscribe x y z ---- -*/ -func (c *Client) subscribes(topics ...string) error { +/*func (c *Client) subscribes(topics ...string) error { if len(topics) == 0 { return nil } @@ -188,7 +184,7 @@ func (c *Client) subscribes(topics ...string) error { } c.send(messages) return nil -} +}*/ /* send socket message : diff --git a/cli_test.go b/cli_test.go index 08d4ffe..d28820f 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,6 +2,7 @@ package main import ( "log" + "strconv" "testing" "time" "zhub/cli" @@ -40,14 +41,22 @@ func TestCli(t *testing.T) { } func TestTimer(t *testing.T) { + go func() { + client, _ := cli.Create(addr, "topic-2") + + client.Subscribe("ax", func(v string) { + log.Println("topic-1-ax: " + v) + }) + }() go func() { client, _ := cli.Create(addr, "topic-1") - client.Timer("a", func() { - log.Println("client-1 收到 a 的定时消息") + + client.Subscribe("ax", func(v string) { + log.Println("topic-2-ax: " + v) }) }() - go func() { + /*go func() { client, _ := cli.Create(addr, "topic-2") client.Timer("a", func() { log.Println("client-2 收到 a 的定时消息") @@ -69,7 +78,7 @@ func TestTimer(t *testing.T) { client.Timer("VIP-EXP-EXPIRE", func() { log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息") }) - }() + }()*/ time.Sleep(time.Hour * 3) } @@ -88,8 +97,9 @@ func TestPublish(t *testing.T) { if err != nil { log.Println(err) } - - client.Publish("ax", "a") + for i := 0; i < 30_0000; i++ { + client.Publish("ax", strconv.Itoa(i)) + } time.Sleep(time.Second) } diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 402b1d5..50a67f8 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -67,6 +67,8 @@ func msgAccept(v Message) { } else { zsub.publish(rcmd[1], rcmd[2]) } + case "broadcast": + zsub.broadcast(rcmd[1], rcmd[2]) case "daly": daly(rcmd, c) case "timer": diff --git a/zsub/zsub.go b/zsub/zsub.go index 8edfb11..256ea28 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -17,7 +17,7 @@ var ( ) type ZSub struct { - sync.Mutex + sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer } @@ -99,9 +99,9 @@ accept topic message 1、send message to topic's chan 2、feedback send success to sender, and sending message to topic's subscripts */ -func (s *ZSub) publish(topic string, msg string) { - s.Lock() - defer s.Unlock() +func (s *ZSub) publish(topic, msg string) { + s.RLock() + defer s.RUnlock() ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -110,6 +110,25 @@ func (s *ZSub) publish(topic string, msg string) { ztopic.mcount++ } +/* +send broadcast message +*/ +func (s *ZSub) broadcast(topic, msg string) { + s.RLock() + defer s.RUnlock() + + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + + for _, group := range ztopic.groups { + for _, conn := range group.conns { + conn.send("message", topic, msg) + } + } +} + func (s *ZSub) close(c *ZConn) { // sub for _, topic := range c.topics {