diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 50a67f8..fa9e4e2 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -70,7 +70,7 @@ func msgAccept(v Message) { case "broadcast": zsub.broadcast(rcmd[1], rcmd[2]) case "daly": - daly(rcmd, c) + zsub.daly(rcmd, c) case "timer": zsub.timer(rcmd, c) case "cmd": @@ -88,7 +88,9 @@ func msgAccept(v Message) { } // daly topic value 100 -> publish topic value -func daly(rcmd []string, c *ZConn) { +func (s *ZSub) daly(rcmd []string, c *ZConn) { + s.Lock() + defer s.Unlock() if len(rcmd) != 4 { c.send("-Error: subscribe para number!") return @@ -100,10 +102,24 @@ func daly(rcmd []string, c *ZConn) { return } - timer := time.NewTimer(time.Duration(t) * time.Millisecond) - select { - case <-timer.C: - zsub.publish(rcmd[1], rcmd[2]) + timer := s.delays[rcmd[1]+"-"+rcmd[2]] + if timer != nil { + if t == -1 { + timer.Stop() + s.delays[rcmd[1]+"-"+rcmd[2]] = nil + return + } + timer.Reset(time.Duration(t) * time.Millisecond) + } else { + timer = time.NewTimer(time.Duration(t) * time.Millisecond) + s.delays[rcmd[1]+"-"+rcmd[2]] = timer + go func() { + select { + case <-timer.C: + zsub.publish(rcmd[1], rcmd[2]) + s.delays[rcmd[1]+"-"+rcmd[2]] = nil + } + }() } } diff --git a/zsub/zsub.go b/zsub/zsub.go index eeb6da3..adf4033 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -7,12 +7,14 @@ import ( "strconv" "strings" "sync" + "time" ) var ( zsub = ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), + delays: make(map[string]*time.Timer), } ) @@ -20,6 +22,7 @@ type ZSub struct { sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer + delays map[string]*time.Timer } type ZConn struct { //ZConn