diff --git a/app.go b/app.go index 2678d94..a82b1cb 100644 --- a/app.go +++ b/app.go @@ -4,6 +4,7 @@ import ( "log" "os" "strings" + "time" "zhub/cli" "zhub/conf" "zhub/zsub" @@ -29,11 +30,17 @@ func main() { } if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") { - if cli, err := cli.Create(addr, ""); err != nil { + if cli, err := cli.Create(addr, "group-admin"); err != nil { log.Println(err) } else { - cli.Cmd("reload-timer-config") + switch os.Args[2] { + case "timer": + cli.Cmd("reload-timer-config") + case "shutdown": + cli.Cmd("shutdown") + } cli.Close() + time.Sleep(time.Millisecond * 10) } return } diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 89d5331..75cfb38 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -8,10 +8,17 @@ import ( "zhub/conf" ) +type ZDelay struct { + topic string + value string + exectime time.Time + timer *time.Timer +} + func msgAccept(v Message) { defer func() { if r := recover(); r != nil { - log.Println("ExecCmd Recovered:", r) + log.Println("msgAccept Recovered:", r) } }() c := v.Conn @@ -80,6 +87,11 @@ func msgAccept(v Message) { switch rcmd[1] { case "reload-timer-config": zsub.reloadTimerConfig() + case "shutdown": + if !strings.EqualFold(c.groupid, "group-admin") { + return + } + zsub.shutdown() } default: c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") @@ -90,7 +102,10 @@ func msgAccept(v Message) { // delay topic value 100 -> publish topic value func (s *ZSub) delay(rcmd []string, c *ZConn) { s.Lock() - defer s.Unlock() + defer func() { + s.Unlock() + s.saveDelay() + }() if len(rcmd) != 4 { c.send("-Error: subscribe para number!") return @@ -102,20 +117,25 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { return } - timer := s.delays[rcmd[1]+"-"+rcmd[2]] - if timer != nil { + delay := s.delays[rcmd[1]+"-"+rcmd[2]] + if delay != nil { if t == -1 { - timer.Stop() + delay.timer.Stop() delete(s.delays, rcmd[1]+"-"+rcmd[2]) return } - timer.Reset(time.Duration(t) * time.Millisecond) + delay.timer.Reset(time.Duration(t) * time.Millisecond) } else { - timer = time.NewTimer(time.Duration(t) * time.Millisecond) - s.delays[rcmd[1]+"-"+rcmd[2]] = timer + delay := &ZDelay{ + topic: rcmd[1], + value: rcmd[2], + exectime: time.Now().Add(time.Duration(t) * time.Millisecond), + timer: time.NewTimer(time.Duration(t) * time.Millisecond), + } + s.delays[rcmd[1]+"-"+rcmd[2]] = delay go func() { select { - case <-timer.C: + case <-delay.timer.C: zsub.publish(rcmd[1], rcmd[2]) delete(s.delays, rcmd[1]+"-"+rcmd[2]) } diff --git a/zsub/zdb.go b/zsub/zdb.go index 19d8b6f..87ac9a7 100644 --- a/zsub/zdb.go +++ b/zsub/zdb.go @@ -1,7 +1,17 @@ package zsub +import ( + "bufio" + "fmt" + "log" + "os" + "strconv" + "strings" + "time" +) + var ( - chanMessages = make(chan Message, 1000) //接收到的 所有消息数据 +// hubChan = make(chan Message, 1000) //接收到的 所有消息数据 ) // 数据封装 @@ -9,3 +19,66 @@ type Message struct { Conn *ZConn Rcmd []string } + +// 文件追加内容 +func Append(str string, fileName string) { + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModeAppend) + if err != nil { + fmt.Println(err) + } + defer file.Close() + + _, err = file.WriteString(str) + if err != nil { + log.Println(err) + } +} + +// 数据持久化 +func (s *ZSub) saveDelay() { + s.Lock() + defer s.Unlock() + err := os.Remove("delay.z") + if err != nil { + log.Println(err) + } + + for _, delay := range s.delays { + Append(fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10)), "delay.z") + } +} + +func (s *ZSub) reloadDelay() { + f, err := os.Open("delay.z") + if err != nil { + return + } + defer f.Close() + + r := bufio.NewReader(f) + for { + bytes, err := r.ReadBytes('\n') + if err != nil { + return + } + line := string(bytes) + if len(line) == 0 { + continue + } + line = strings.Trim(line, " \r\n") + split := strings.Split(line, " ") + if len(split) != 3 { + continue + } + + exectime, err := strconv.ParseInt(split[2], 10, 64) + if err != nil { + log.Println(err) + continue + } + if exectime < time.Now().Unix() { + continue + } + s.delay([]string{"delay", split[0], split[1], strconv.FormatInt((exectime-time.Now().Unix())*1000, 10)}, nil) + } +} diff --git a/zsub/zgroup.go b/zsub/zgroup.go index a6307b3..d6f9b8a 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -21,9 +21,10 @@ func (g *ZGroup) appendTo(c *ZConn) { return } + // create new goroutine consumer message c.substoped[topic] = make(chan int, 0) c.appendTo(g.conns) - go func() { // create new goroutine consumer message + go func() { for { select { case msg, ok := <-g.chMsg: diff --git a/zsub/zsub.go b/zsub/zsub.go index d65c665..606828c 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -4,17 +4,18 @@ import ( "bufio" "log" "net" + "os" "strconv" "strings" "sync" - "time" + "zhub/conf" ) var ( zsub = ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), - delays: make(map[string]*time.Timer), + delays: make(map[string]*ZDelay), } ) @@ -22,7 +23,7 @@ type ZSub struct { sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer - delays map[string]*time.Timer + delays map[string]*ZDelay } type ZConn struct { //ZConn @@ -190,8 +191,11 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { */ func ServerStart(addr string) { - // 加载定时调度服务 - zsub.reloadTimerConfig() + conf.GetStr("data.dir", "data") + + // 重新加载[定时、延时] + go zsub.reloadTimerConfig() + go zsub.reloadDelay() // 启动服务监听 listen, err := net.Listen("tcp", addr) @@ -200,19 +204,6 @@ func ServerStart(addr string) { } log.Printf("zhub started listen on: %s \n", addr) - // 启动消息监听处理 - go func() { - for { - v, ok := <-chanMessages - if !ok { - break - } - - // 事件消费 - msgAccept(v) - } - }() - for { conn, err := listen.Accept() if err != nil { @@ -264,7 +255,12 @@ func (s *ZSub) acceptHandler(c *ZConn) { continue } - // 接收消息 zdb fixme: 细节暴露太多 - chanMessages <- Message{Conn: c, Rcmd: rcmd} + msgAccept(Message{Conn: c, Rcmd: rcmd}) } } + +func (s *ZSub) shutdown() { + s.saveDelay() + s.Lock() + os.Exit(0) +}