From 8e1074e461429d1fed32ad02dcb13a4964c2df2d Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 1 Feb 2021 11:18:26 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Adelay=20=E5=BB=B6?= =?UTF-8?q?=E6=97=B6=E6=94=AF=E6=8C=81=E5=BB=B6=E6=97=B6=E5=8F=98=E6=9B=B4?= =?UTF-8?q?=E3=80=81=E5=8F=96=E6=B6=88=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@95 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/msg-consumer.go | 28 ++++++++++++++++++++++------ zsub/zsub.go | 3 +++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 50a67f8..fa9e4e2 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -70,7 +70,7 @@ func msgAccept(v Message) { case "broadcast": zsub.broadcast(rcmd[1], rcmd[2]) case "daly": - daly(rcmd, c) + zsub.daly(rcmd, c) case "timer": zsub.timer(rcmd, c) case "cmd": @@ -88,7 +88,9 @@ func msgAccept(v Message) { } // daly topic value 100 -> publish topic value -func daly(rcmd []string, c *ZConn) { +func (s *ZSub) daly(rcmd []string, c *ZConn) { + s.Lock() + defer s.Unlock() if len(rcmd) != 4 { c.send("-Error: subscribe para number!") return @@ -100,10 +102,24 @@ func daly(rcmd []string, c *ZConn) { return } - timer := time.NewTimer(time.Duration(t) * time.Millisecond) - select { - case <-timer.C: - zsub.publish(rcmd[1], rcmd[2]) + timer := s.delays[rcmd[1]+"-"+rcmd[2]] + if timer != nil { + if t == -1 { + timer.Stop() + s.delays[rcmd[1]+"-"+rcmd[2]] = nil + return + } + timer.Reset(time.Duration(t) * time.Millisecond) + } else { + timer = time.NewTimer(time.Duration(t) * time.Millisecond) + s.delays[rcmd[1]+"-"+rcmd[2]] = timer + go func() { + select { + case <-timer.C: + zsub.publish(rcmd[1], rcmd[2]) + s.delays[rcmd[1]+"-"+rcmd[2]] = nil + } + }() } } diff --git a/zsub/zsub.go b/zsub/zsub.go index eeb6da3..adf4033 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -7,12 +7,14 @@ import ( "strconv" "strings" "sync" + "time" ) var ( zsub = ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), + delays: make(map[string]*time.Timer), } ) @@ -20,6 +22,7 @@ type ZSub struct { sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer + delays map[string]*time.Timer } type ZConn struct { //ZConn