新增:广播消息;

优化:订阅服务消息发送使用读锁

git-svn-id: svn://47.119.165.148/zhub@76 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-01-21 11:05:49 +00:00
parent 03a7118598
commit 3c66a41de2
4 changed files with 66 additions and 39 deletions

View File

@@ -50,7 +50,7 @@ func Create(addr string, groupid string) (*Client, error) {
timerReceive: make(chan []string, 100),
}
conn.Write([]byte("groupid " + groupid + "\r\n"))
client.send("groupid " + groupid)
client.init()
return &client, err
}
@@ -64,15 +64,15 @@ func (c *Client) reconn() (err error) {
continue
} else if err == nil {
c.conn = conn
conn.Write([]byte("groupid " + c.groupid + "\r\n"))
c.send("groupid " + c.groupid)
go c.receive()
// 重新订阅
for topic, _ := range c.subFun {
c.subscribes(topic)
c.Subscribe(topic, nil)
}
for topic, _ := range c.timerFun {
c.timer(topic)
c.Timer(topic, nil)
}
break
}
@@ -100,16 +100,23 @@ func (c *Client) init() {
}
fun()
}
}
}()
go c.receive()
}
/*
// subscribe topic
---
subscribe x y z
---
*/
func (c *Client) Subscribe(topic string, fun func(v string)) {
c.subFun[topic] = fun
c.subscribes(topic)
c.send("subscribe " + topic)
if fun != nil {
c.subFun[topic] = fun
}
}
/*
@@ -135,13 +142,15 @@ $24
---
*/
func (c *Client) Publish(topic string, message string) error {
c.send("publish", topic, message)
return nil
return c.send("publish", topic, message)
}
func (c *Client) Broadcast(topic string, message string) error {
return c.send("broadcast", topic, message)
}
func (c *Client) Daly(topic string, message string, daly int) error {
c.send("daly", topic, message, strconv.Itoa(daly))
return nil
return c.send("daly", topic, message, strconv.Itoa(daly))
}
/*func (c *Client) Timer(topic string, expr string, fun func()) {
@@ -149,16 +158,9 @@ func (c *Client) Daly(topic string, message string, daly int) error {
c.send("timer", topic, expr, "x")
}*/
func (c *Client) Timer(topic string, fun func()) {
c.timerFun[topic] = fun
c.send("timer", topic)
}
func (c *Client) TimerSingle(topic string, expr string, fun func()) {
c.timerFun[topic] = fun
c.send("timer", topic, expr, "a")
}
// todo: save client timers info
func (c *Client) timer(topic string) {
if fun != nil {
c.timerFun[topic] = fun
}
c.send("timer", topic)
}
@@ -171,13 +173,7 @@ func (c *Client) Close() {
c.conn.Close()
}
/*
// subscribe topic
---
subscribe x y z
---
*/
func (c *Client) subscribes(topics ...string) error {
/*func (c *Client) subscribes(topics ...string) error {
if len(topics) == 0 {
return nil
}
@@ -188,7 +184,7 @@ func (c *Client) subscribes(topics ...string) error {
}
c.send(messages)
return nil
}
}*/
/*
send socket message :

View File

@@ -2,6 +2,7 @@ package main
import (
"log"
"strconv"
"testing"
"time"
"zhub/cli"
@@ -40,14 +41,22 @@ func TestCli(t *testing.T) {
}
func TestTimer(t *testing.T) {
go func() {
client, _ := cli.Create(addr, "topic-2")
client.Subscribe("ax", func(v string) {
log.Println("topic-1-ax: " + v)
})
}()
go func() {
client, _ := cli.Create(addr, "topic-1")
client.Timer("a", func() {
log.Println("client-1 收到 a 的定时消息")
client.Subscribe("ax", func(v string) {
log.Println("topic-2-ax: " + v)
})
}()
go func() {
/*go func() {
client, _ := cli.Create(addr, "topic-2")
client.Timer("a", func() {
log.Println("client-2 收到 a 的定时消息")
@@ -69,7 +78,7 @@ func TestTimer(t *testing.T) {
client.Timer("VIP-EXP-EXPIRE", func() {
log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息")
})
}()
}()*/
time.Sleep(time.Hour * 3)
}
@@ -88,8 +97,9 @@ func TestPublish(t *testing.T) {
if err != nil {
log.Println(err)
}
client.Publish("ax", "a")
for i := 0; i < 30_0000; i++ {
client.Publish("ax", strconv.Itoa(i))
}
time.Sleep(time.Second)
}

View File

@@ -67,6 +67,8 @@ func msgAccept(v Message) {
} else {
zsub.publish(rcmd[1], rcmd[2])
}
case "broadcast":
zsub.broadcast(rcmd[1], rcmd[2])
case "daly":
daly(rcmd, c)
case "timer":

View File

@@ -17,7 +17,7 @@ var (
)
type ZSub struct {
sync.Mutex
sync.RWMutex
topics map[string]*ZTopic
timers map[string]*ZTimer
}
@@ -99,9 +99,9 @@ accept topic message
1、send message to topic's chan
2、feedback send success to sender, and sending message to topic's subscripts
*/
func (s *ZSub) publish(topic string, msg string) {
s.Lock()
defer s.Unlock()
func (s *ZSub) publish(topic, msg string) {
s.RLock()
defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
return
@@ -110,6 +110,25 @@ func (s *ZSub) publish(topic string, msg string) {
ztopic.mcount++
}
/*
send broadcast message
*/
func (s *ZSub) broadcast(topic, msg string) {
s.RLock()
defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
return
}
for _, group := range ztopic.groups {
for _, conn := range group.conns {
conn.send("message", topic, msg)
}
}
}
func (s *ZSub) close(c *ZConn) {
// sub
for _, topic := range c.topics {