From c711cc10ae685b4c52942fc695980b8adba77fe8 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Tue, 23 Feb 2021 10:46:34 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E5=86=85=E9=83=A8?= =?UTF-8?q?=E6=8C=87=E4=BB=A4=E6=89=A7=E8=A1=8C=E4=B8=B2=E8=A1=8C=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=EF=BC=8C=E9=81=BF=E5=85=8D=E5=B9=B6=E5=8F=91=E4=B8=8B?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=20map=20=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@109 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/msg-consumer.go | 71 +++++++++++++++++++++++++------------------- zsub/zsub.go | 11 ++++++- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 75cfb38..515bb76 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -8,6 +8,8 @@ import ( "zhub/conf" ) +var funChan = make(chan func(), 1000) + type ZDelay struct { topic string value string @@ -56,46 +58,53 @@ func msgAccept(v Message) { } cmd := rcmd[0] - switch cmd { - case "groupid": + if strings.EqualFold(cmd, "groupid") { c.groupid = rcmd[1] - case "subscribe": - // subscribe x y z - for _, topic := range rcmd[1:] { - zsub.subscribe(c, topic) // todo: 批量一次订阅 - } - case "unsubscribe": - for _, topic := range rcmd[1:] { - zsub.unsubscribe(c, topic) - } - case "publish": + return + } else if strings.EqualFold(cmd, "publish") { if len(rcmd) != 3 { c.send("-Error: publish para number!") } else { zsub.publish(rcmd[1], rcmd[2]) } - case "broadcast": - zsub.broadcast(rcmd[1], rcmd[2]) - case "delay": - zsub.delay(rcmd, c) - case "timer": - zsub.timer(rcmd, c) - case "cmd": - if len(rcmd) == 1 { - return - } - switch rcmd[1] { - case "reload-timer-config": - zsub.reloadTimerConfig() - case "shutdown": - if !strings.EqualFold(c.groupid, "group-admin") { + return + } + + // 内部执行指令 加入执行队列 + funChan <- func() { + switch cmd { + case "subscribe": + // subscribe x y z + for _, topic := range rcmd[1:] { + zsub.subscribe(c, topic) // todo: 批量一次订阅 + } + case "unsubscribe": + for _, topic := range rcmd[1:] { + zsub.unsubscribe(c, topic) + } + case "broadcast": + zsub.broadcast(rcmd[1], rcmd[2]) + case "delay": + zsub.delay(rcmd, c) + case "timer": + zsub.timer(rcmd, c) + case "cmd": + if len(rcmd) == 1 { return } - zsub.shutdown() + switch rcmd[1] { + case "reload-timer-config": + zsub.reloadTimerConfig() + case "shutdown": + if !strings.EqualFold(c.groupid, "group-admin") { + return + } + zsub.shutdown() + } + default: + c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") + return } - default: - c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") - return } } diff --git a/zsub/zsub.go b/zsub/zsub.go index 1387741..90674ea 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -190,9 +190,18 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { 2、启动服务监听 */ func ServerStart(addr string) { - conf.GetStr("data.dir", "data") + go func() { + for { + fun, ok := <-funChan + if !ok { + break + } + fun() + } + }() + // 重新加载[定时、延时] go zsub.reloadTimerConfig() go zsub.reloadDelay()