diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 75cfb38..515bb76 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -8,6 +8,8 @@ import ( "zhub/conf" ) +var funChan = make(chan func(), 1000) + type ZDelay struct { topic string value string @@ -56,46 +58,53 @@ func msgAccept(v Message) { } cmd := rcmd[0] - switch cmd { - case "groupid": + if strings.EqualFold(cmd, "groupid") { c.groupid = rcmd[1] - case "subscribe": - // subscribe x y z - for _, topic := range rcmd[1:] { - zsub.subscribe(c, topic) // todo: 批量一次订阅 - } - case "unsubscribe": - for _, topic := range rcmd[1:] { - zsub.unsubscribe(c, topic) - } - case "publish": + return + } else if strings.EqualFold(cmd, "publish") { if len(rcmd) != 3 { c.send("-Error: publish para number!") } else { zsub.publish(rcmd[1], rcmd[2]) } - case "broadcast": - zsub.broadcast(rcmd[1], rcmd[2]) - case "delay": - zsub.delay(rcmd, c) - case "timer": - zsub.timer(rcmd, c) - case "cmd": - if len(rcmd) == 1 { - return - } - switch rcmd[1] { - case "reload-timer-config": - zsub.reloadTimerConfig() - case "shutdown": - if !strings.EqualFold(c.groupid, "group-admin") { + return + } + + // 内部执行指令 加入执行队列 + funChan <- func() { + switch cmd { + case "subscribe": + // subscribe x y z + for _, topic := range rcmd[1:] { + zsub.subscribe(c, topic) // todo: 批量一次订阅 + } + case "unsubscribe": + for _, topic := range rcmd[1:] { + zsub.unsubscribe(c, topic) + } + case "broadcast": + zsub.broadcast(rcmd[1], rcmd[2]) + case "delay": + zsub.delay(rcmd, c) + case "timer": + zsub.timer(rcmd, c) + case "cmd": + if len(rcmd) == 1 { return } - zsub.shutdown() + switch rcmd[1] { + case "reload-timer-config": + zsub.reloadTimerConfig() + case "shutdown": + if !strings.EqualFold(c.groupid, "group-admin") { + return + } + zsub.shutdown() + } + default: + c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") + return } - default: - c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") - return } } diff --git a/zsub/zsub.go b/zsub/zsub.go index 1387741..90674ea 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -190,9 +190,18 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { 2、启动服务监听 */ func ServerStart(addr string) { - conf.GetStr("data.dir", "data") + go func() { + for { + fun, ok := <-funChan + if !ok { + break + } + fun() + } + }() + // 重新加载[定时、延时] go zsub.reloadTimerConfig() go zsub.reloadDelay()