From 40caa50fb7c63b0c707949286defe43b7358b6a6 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Wed, 24 Feb 2021 10:20:43 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@110 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- app.go | 2 +- cli_test.go | 2 +- zsub/msg-consumer.go | 88 ++++------------------------------ zsub/zsub.go | 111 +++++++++++++++++++++++++------------------ zsub/zsub_test.go | 37 --------------- zsub/ztimer.go | 55 ++++++++++++++++++++- 6 files changed, 129 insertions(+), 166 deletions(-) delete mode 100644 zsub/zsub_test.go diff --git a/app.go b/app.go index a82b1cb..d9dbeda 100644 --- a/app.go +++ b/app.go @@ -35,7 +35,7 @@ func main() { } else { switch os.Args[2] { case "timer": - cli.Cmd("reload-timer-config") + cli.Cmd("reload-timer") case "shutdown": cli.Cmd("shutdown") } diff --git a/cli_test.go b/cli_test.go index d33a179..f808d37 100644 --- a/cli_test.go +++ b/cli_test.go @@ -113,7 +113,7 @@ func TestSendCmd(t *testing.T) { log.Println(err) } - client.Cmd("reload-timer-config") + client.Cmd("reload-timer") } func TestPublish(t *testing.T) { diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 515bb76..5f9fa16 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -2,21 +2,12 @@ package zsub import ( "log" - "strconv" "strings" - "time" "zhub/conf" ) var funChan = make(chan func(), 1000) -type ZDelay struct { - topic string - value string - exectime time.Time - timer *time.Timer -} - func msgAccept(v Message) { defer func() { if r := recover(); r != nil { @@ -58,16 +49,18 @@ func msgAccept(v Message) { } cmd := rcmd[0] - if strings.EqualFold(cmd, "groupid") { + switch cmd { + case "groupid": c.groupid = rcmd[1] return - } else if strings.EqualFold(cmd, "publish") { + case "publish": if len(rcmd) != 3 { c.send("-Error: publish para number!") } else { zsub.publish(rcmd[1], rcmd[2]) } return + default: } // 内部执行指令 加入执行队列 @@ -76,11 +69,11 @@ func msgAccept(v Message) { case "subscribe": // subscribe x y z for _, topic := range rcmd[1:] { - zsub.subscribe(c, topic) // todo: 批量一次订阅 + c.subscribe(topic) // todo: 批量一次订阅 } case "unsubscribe": for _, topic := range rcmd[1:] { - zsub.unsubscribe(c, topic) + c.unsubscribe(topic) } case "broadcast": zsub.broadcast(rcmd[1], rcmd[2]) @@ -93,8 +86,8 @@ func msgAccept(v Message) { return } switch rcmd[1] { - case "reload-timer-config": - zsub.reloadTimerConfig() + case "reload-timer": + zsub.reloadTimer() case "shutdown": if !strings.EqualFold(c.groupid, "group-admin") { return @@ -107,68 +100,3 @@ func msgAccept(v Message) { } } } - -// delay topic value 100 -> publish topic value -func (s *ZSub) delay(rcmd []string, c *ZConn) { - s.Lock() - defer func() { - s.Unlock() - s.saveDelay() - }() - if len(rcmd) != 4 { - c.send("-Error: subscribe para number!") - return - } - - t, err := strconv.ParseInt(rcmd[3], 10, 64) - if err != nil { - c.send("-Error: " + strings.Join(rcmd, " ")) - return - } - - delay := s.delays[rcmd[1]+"-"+rcmd[2]] - if delay != nil { - if t == -1 { - delay.timer.Stop() - delete(s.delays, rcmd[1]+"-"+rcmd[2]) - return - } - delay.timer.Reset(time.Duration(t) * time.Millisecond) - } else { - delay := &ZDelay{ - topic: rcmd[1], - value: rcmd[2], - exectime: time.Now().Add(time.Duration(t) * time.Millisecond), - timer: time.NewTimer(time.Duration(t) * time.Millisecond), - } - s.delays[rcmd[1]+"-"+rcmd[2]] = delay - go func() { - select { - case <-delay.timer.C: - zsub.publish(rcmd[1], rcmd[2]) - delete(s.delays, rcmd[1]+"-"+rcmd[2]) - } - }() - } -} - -// send message -func (c *ZConn) send(vs ...string) error { - c.Lock() - defer c.Unlock() - - var bytes []byte - - if len(vs) == 1 { - bytes = []byte(vs[0] + "\r\n") - } else if len(vs) > 1 { - data := "*" + strconv.Itoa(len(vs)) + "\r\n" - for _, v := range vs { - data += "$" + strconv.Itoa(len(v)) + "\r\n" - data += v + "\r\n" - } - bytes = []byte(data) - } - _, err := (*c.conn).Write(bytes) - return err -} diff --git a/zsub/zsub.go b/zsub/zsub.go index 90674ea..4a53cbb 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -12,7 +12,7 @@ import ( ) var ( - zsub = ZSub{ + zsub = &ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), delays: make(map[string]*ZDelay), @@ -52,10 +52,10 @@ func NewZConn(conn *net.Conn) *ZConn { 2、加入到对应组别;如果是第一次的消费组 offset从当前 mcount 开始 3、若有待消费消息启动消费 */ -func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} - s.Lock() - defer s.Unlock() - ztopic := s.topics[topic] //ZTopic +func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} + zsub.Lock() + defer zsub.Unlock() + ztopic := zsub.topics[topic] //ZTopic if ztopic == nil { ztopic = &ZTopic{ groups: map[string]*ZGroup{}, @@ -63,7 +63,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} chMsg: make(chan string, 10000), } ztopic.init() - s.topics[topic] = ztopic + zsub.topics[topic] = ztopic } zgroup := ztopic.groups[c.groupid] //ZGroup @@ -89,11 +89,11 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} /* 取消订阅: */ -func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} +func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{} c.Lock() defer c.Unlock() close(c.substoped[topic]) - ztopic := s.topics[topic] //ZTopic + ztopic := zsub.topics[topic] //ZTopic if ztopic == nil { return } @@ -110,55 +110,39 @@ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} } } -/* -accept topic message -1、send message to topic's chan -2、feedback send success to sender, and sending message to topic's subscripts -*/ -func (s *ZSub) publish(topic, msg string) { - s.RLock() - defer s.RUnlock() - ztopic := s.topics[topic] //ZTopic - if ztopic == nil { - return - } - ztopic.chMsg <- msg - ztopic.mcount++ -} +// send message +func (c *ZConn) send(vs ...string) error { + c.Lock() + defer c.Unlock() -/* -send broadcast message -*/ -func (s *ZSub) broadcast(topic, msg string) { - s.RLock() - defer s.RUnlock() + var bytes []byte - ztopic := s.topics[topic] //ZTopic - if ztopic == nil { - return - } - - for _, group := range ztopic.groups { - for _, conn := range group.conns { - conn.send("message", topic, msg) + if len(vs) == 1 { + bytes = []byte(vs[0] + "\r\n") + } else if len(vs) > 1 { + data := "*" + strconv.Itoa(len(vs)) + "\r\n" + for _, v := range vs { + data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += v + "\r\n" } + bytes = []byte(data) } + _, err := (*c.conn).Write(bytes) + return err } -func (s *ZSub) close(c *ZConn) { +func (c *ZConn) close() { close(c.stoped) // sub for _, topic := range c.topics { - s.unsubscribe(c, topic) + c.unsubscribe(topic) } - // delay - // timer conn close - s.Lock() - defer s.Unlock() + zsub.Lock() + defer zsub.Unlock() for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 - timer := s.timers[topic] + timer := zsub.timers[topic] if timer == nil { continue } @@ -203,7 +187,7 @@ func ServerStart(addr string) { }() // 重新加载[定时、延时] - go zsub.reloadTimerConfig() + go zsub.reloadTimer() go zsub.reloadDelay() // 启动服务监听 @@ -234,7 +218,7 @@ func (s *ZSub) acceptHandler(c *ZConn) { } }() defer func() { - s.close(c) // close ZConn + c.close() // close ZConn }() reader := bufio.NewReader(*c.conn) @@ -268,6 +252,41 @@ func (s *ZSub) acceptHandler(c *ZConn) { } } +/* +accept topic message +1、send message to topic's chan +2、feedback send success to sender, and sending message to topic's subscripts +*/ +func (s *ZSub) publish(topic, msg string) { + s.RLock() + defer s.RUnlock() + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + ztopic.chMsg <- msg + ztopic.mcount++ +} + +/* +send broadcast message +*/ +func (s *ZSub) broadcast(topic, msg string) { + s.RLock() + defer s.RUnlock() + + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + + for _, group := range ztopic.groups { + for _, conn := range group.conns { + conn.send("message", topic, msg) + } + } +} + func (s *ZSub) shutdown() { s.saveDelay() s.Lock() diff --git a/zsub/zsub_test.go b/zsub/zsub_test.go deleted file mode 100644 index bb040ff..0000000 --- a/zsub/zsub_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package zsub - -import ( - "fmt" - "testing" -) - -func TestName(t *testing.T) { - sub := ZSub{ - topics: map[string]*ZTopic{}, - timers: map[string]*ZTimer{}, - } - - sub.subscribe(&ZConn{ - groupid: "a", - }, "ab") - - sub.subscribe(&ZConn{ - groupid: "b", - }, "ab") - - // ----------------- - - sub.subscribe(&ZConn{ - groupid: "b", - }, "abx") - - conn := ZConn{ - groupid: "a", - } - - sub.subscribe(&conn, "abx") - - sub.unsubscribe(&conn, "abx") - - fmt.Println(1) -} diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 0c771ac..d482dd4 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -165,7 +165,7 @@ func executeShell(command string) (string, error, string) { return stdout.String(), err, stderr.String() } -func (s *ZSub) reloadTimerConfig() { +func (s *ZSub) reloadTimer() { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", conf.GetStr("ztimer.db.user", "root"), conf.GetStr("ztimer.db.pwd", "123456"), @@ -192,3 +192,56 @@ func (s *ZSub) reloadTimerConfig() { s.timer([]string{"timer", name, expr, single}, nil) //["timer", topic, expr, a|x] } } + +// ================== delay ===================================== + +type ZDelay struct { + topic string + value string + exectime time.Time + timer *time.Timer +} + +// delay topic value 100 -> publish topic value +func (s *ZSub) delay(rcmd []string, c *ZConn) { + s.Lock() + defer func() { + s.Unlock() + s.saveDelay() + }() + if len(rcmd) != 4 { + c.send("-Error: subscribe para number!") + return + } + + t, err := strconv.ParseInt(rcmd[3], 10, 64) + if err != nil { + c.send("-Error: " + strings.Join(rcmd, " ")) + return + } + + delay := s.delays[rcmd[1]+"-"+rcmd[2]] + if delay != nil { + if t == -1 { + delay.timer.Stop() + delete(s.delays, rcmd[1]+"-"+rcmd[2]) + return + } + delay.timer.Reset(time.Duration(t) * time.Millisecond) + } else { + delay := &ZDelay{ + topic: rcmd[1], + value: rcmd[2], + exectime: time.Now().Add(time.Duration(t) * time.Millisecond), + timer: time.NewTimer(time.Duration(t) * time.Millisecond), + } + s.delays[rcmd[1]+"-"+rcmd[2]] = delay + go func() { + select { + case <-delay.timer.C: + zsub.publish(rcmd[1], rcmd[2]) + delete(s.delays, rcmd[1]+"-"+rcmd[2]) + } + }() + } +}