diff --git a/zsub/monitor.go b/internal/monitor/monitor.go similarity index 87% rename from zsub/monitor.go rename to internal/monitor/monitor.go index 5db155f..1b03166 100644 --- a/zsub/monitor.go +++ b/internal/monitor/monitor.go @@ -1,4 +1,4 @@ -package zsub +package monitor import ( "encoding/json" @@ -6,6 +6,7 @@ import ( "net/http" "os" "path" + "zhub/internal/zsub" ) func init() { @@ -23,7 +24,7 @@ func StartWatch() { http.HandleFunc("/retimer", retimer) http.HandleFunc("/topic/publish", publish) - watchAddr := Conf.Service.Watch + watchAddr := zsub.Conf.Service.Watch log.Println("zhub.watch = ", watchAddr) http.ListenAndServe(watchAddr, nil) } @@ -31,23 +32,23 @@ func StartWatch() { func publish(w http.ResponseWriter, r *http.Request) { topic := r.FormValue("topic") value := r.FormValue("value") - zsub.Publish(topic, value) + zsub.Hub.Publish(topic, value) renderJson(w, "+ok") } // retimer 重载定时调度 func retimer(w http.ResponseWriter, r *http.Request) { - zsub.ReloadTimer() + zsub.Hub.ReloadTimer() renderJson(w, "+reload timer ok") } func cleanup(w http.ResponseWriter, r *http.Request) { - zsub.Clearup() + zsub.Hub.Clearup() renderJson(w, "+OK") } func info(w http.ResponseWriter, r *http.Request) { - info := Info() + info := zsub.Info() renderJson(w, info) } diff --git a/zsub/msg-consumer.go b/internal/zsub/msg-accept.go similarity index 86% rename from zsub/msg-consumer.go rename to internal/zsub/msg-accept.go index 389e0fb..763ac9a 100644 --- a/zsub/msg-consumer.go +++ b/internal/zsub/msg-accept.go @@ -10,10 +10,10 @@ import ( var funChan = make(chan func(), 1000) -func msgAccept(v Message) { +func handleMessage(v Message) { defer func() { if r := recover(); r != nil { - log.Println("msgAccept Recovered:", r) + log.Println("handleMessage Recovered:", r) } }() c := v.Conn @@ -87,13 +87,13 @@ func msgAccept(v Message) { return case "rpc": // if rpc and no sub back error - if zsub.noSubscribe(rcmd[1]) { + if Hub.noSubscribe(rcmd[1]) { rpcBody := make(map[string]string) json.Unmarshal([]byte(rcmd[2]), &rpcBody) log.Println("rpc no subscribe: ", rcmd[1]) ruk := rpcBody["ruk"] - zsub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") + Hub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") return } @@ -103,7 +103,7 @@ func msgAccept(v Message) { /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd }*/ - zsub.Publish(rcmd[1], rcmd[2]) + Hub.Publish(rcmd[1], rcmd[2]) } return case "publish": @@ -113,7 +113,7 @@ func msgAccept(v Message) { /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd }*/ - zsub.Publish(rcmd[1], rcmd[2]) + Hub.Publish(rcmd[1], rcmd[2]) } return default: @@ -137,13 +137,13 @@ func msgAccept(v Message) { c.unsubscribe(topic) } case "broadcast": - zsub.broadcast(rcmd[1], rcmd[2]) + Hub.broadcast(rcmd[1], rcmd[2]) case "delay": - zsub.delay(rcmd, c) + Hub.delay(rcmd, c) case "timer": for _, name := range rcmd[1:] { - zsub.timer([]string{"timer", name}, c) // append to timers - c.timers = append(c.timers, name) // append to conns + Hub.timer([]string{"timer", name}, c) // append to timers + c.timers = append(c.timers, name) // append to conns } case "cmd": if len(rcmd) == 1 { @@ -151,12 +151,12 @@ func msgAccept(v Message) { } switch rcmd[1] { case "reload-timer": - zsub.ReloadTimer() + Hub.ReloadTimer() case "shutdown": if !strings.EqualFold(c.groupid, "group-admin") { return } - zsub.shutdown() + Hub.shutdown() } case "lock": // lock key uuid 5 @@ -165,14 +165,14 @@ func msgAccept(v Message) { return } d, _ := strconv.Atoi(rcmd[3]) - zsub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) + Hub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) case "unlock": // unlock key uuid if len(rcmd) != 3 { c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]") return } - zsub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) + Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) /*case "auth": if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 { c.send("-Error: invalid password!") diff --git a/zsub/zdb.go b/internal/zsub/zdb.go similarity index 59% rename from zsub/zdb.go rename to internal/zsub/zdb.go index 6a3761c..1cff46d 100644 --- a/zsub/zdb.go +++ b/internal/zsub/zdb.go @@ -2,7 +2,6 @@ package zsub import ( "bufio" - "database/sql" "fmt" "log" "os" @@ -11,10 +10,9 @@ import ( "time" ) -/* var ( - topicChan = make(chan []string, 1000) //接收到的 所有消息数据, 用于写入数据库持久化 -)*/ + datadir string +) // Message 数据封装 type Message struct { @@ -36,11 +34,10 @@ func Append(str string, fileName string) { } } -// 数据持久化 -func (s *ZSub) dataStorage() { +func (s *ZSub) SaveData() { defer func() { if r := recover(); r != nil { - log.Println("dataStorage Recovered:", r) + log.Println("SaveData Recovered:", r) } }() @@ -69,14 +66,8 @@ func (s *ZSub) dataStorage() { _delays := s.delays for _, delay := range _delays { - delayStr := fmt.Sprintf("%s %s %d\n", delay.topic, delay.value, delay.exectime.Unix()) + delayStr := fmt.Sprintf("%s %s %d\n", delay.Topic, delay.Value, delay.Exectime.Unix()) writer.WriteString(delayStr) - /*writer.WriteString(delay.topic) - writer.WriteString(" ") - writer.WriteString(delay.value) - writer.WriteString(" ") - writer.WriteString(strconv.FormatInt(delay.exectime.Unix(), 10)) - writer.WriteString("\n")*/ } writer.Flush() }() @@ -98,6 +89,11 @@ func (s *ZSub) dataStorage() { }() } +func (s *ZSub) LoadData() { + s.loadDelay() + // s.loadLock() +} + func (s *ZSub) loadDelay() { f, err := os.Open(datadir + "/delay.z") if err != nil { @@ -172,69 +168,3 @@ func (s *ZSub) loadLock() { }) } } - -// -------------------------------------- -var ( - db *sql.DB - seq int64 = 50000 -) - -func init() { - //LoadConf("app.conf") - /* - _db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", - GetStr("ztimer.db.user", "root"), - GetStr("ztimer.db.pwd", "123456"), - GetStr("ztimer.db.addr", "127.0.0.1:3306"), - GetStr("ztimer.db.database", "zhub"), - )) - if err != nil { - log.Println(err) - return - } - - db = _db - - // 批量写入数据库,等待超时5秒,如有数据写入数据 - - go func() { - defer func() { - if r := recover(); r != nil { - log.Println("MsgToDb Recovered:", r) - } - }() - - var flagcount = 0 - var _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" - for { - select { - case msg := <-topicChan: - var topic, value = msg[1], msg[2] - var t = time.Now().UnixNano() / 1e6 - _sql += fmt.Sprintf("('%s','%s','%s',%d),\n", - strconv.FormatInt(t, 36)+"-"+strconv.FormatInt(atomic.AddInt64(&seq, 1), 36), topic, value, t) - flagcount++ - case <-time.After(time.Second * 5): // 等待5秒 - if flagcount > 0 { - flagcount = 100 - } - } - - if flagcount != 100 { - continue - } - - _sql = _sql[:len(_sql)-2] - _sql += ";" - - _, err = db.Exec(_sql) - if err != nil { - log.Println(err) - } - - _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" - flagcount = 0 - } - }() - */ -} diff --git a/zsub/zgroup.go b/internal/zsub/zgroup.go similarity index 100% rename from zsub/zgroup.go rename to internal/zsub/zgroup.go diff --git a/zsub/zsub.go b/internal/zsub/zsub.go similarity index 91% rename from zsub/zsub.go rename to internal/zsub/zsub.go index 1d4276f..f550acb 100644 --- a/zsub/zsub.go +++ b/internal/zsub/zsub.go @@ -16,9 +16,8 @@ import ( ) var ( - Conf config.Config - datadir string - zsub = &ZSub{ + Conf config.Config + Hub = &ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), delays: make(map[string]*ZDelay), @@ -42,7 +41,7 @@ func init() { } }() conns := make([]*ZConn, 0) // 需要关闭的连接 - for _, c := range zsub.conns { + for _, c := range Hub.conns { if c.ping > 0 && c.ping-c.pong > 19 { conns = c.appendTo(conns) continue @@ -64,7 +63,7 @@ func init() { } - zsub.dataStorage() + Hub.SaveData() } }() } @@ -120,9 +119,9 @@ func NewZConn(conn *net.Conn) *ZConn { 3、若有待消费消息启动消费 */ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} - zsub.Lock() - defer zsub.Unlock() - ztopic := zsub.topics[topic] //ZTopic + Hub.Lock() + defer Hub.Unlock() + ztopic := Hub.topics[topic] //ZTopic if ztopic == nil { ztopic = &ZTopic{ groups: map[string]*ZGroup{}, @@ -130,7 +129,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} chMsg: make(chan string, 500), } ztopic.init() - zsub.topics[topic] = ztopic + Hub.topics[topic] = ztopic } zgroup := ztopic.groups[c.groupid] //ZGroup @@ -160,7 +159,7 @@ func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{} c.Lock() defer c.Unlock() close(c.substoped[topic]) - ztopic := zsub.topics[topic] //ZTopic + ztopic := Hub.topics[topic] //ZTopic if ztopic == nil { return } @@ -206,17 +205,17 @@ func (c *ZConn) close() { } // timer conn close - zsub.Lock() - defer zsub.Unlock() + Hub.Lock() + defer Hub.Unlock() for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 - timer := zsub.timers[topic] + timer := Hub.timers[topic] if timer == nil { continue } - for i, item := range timer.conns { + for i, item := range timer.Conns { if item == c { - timer.conns = append(timer.conns[:i], timer.conns[i+1:]...) + timer.Conns = append(timer.Conns[:i], timer.Conns[i+1:]...) } } } @@ -268,9 +267,8 @@ func StartServer(addr string, conf config.Config) { }() // 重新加载[定时、延时] - go zsub.ReloadTimer() - go zsub.loadDelay() - //go zsub.loadLock() + go Hub.ReloadTimer() + go Hub.LoadData() // 启动服务监听 listen, err := net.Listen("tcp", addr) @@ -288,22 +286,22 @@ func StartServer(addr string, conf config.Config) { zConn := NewZConn(&conn) log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn) - go zsub.acceptHandler(zConn) + go Hub.handlerConn(zConn) } } // 连接处理 -func (s *ZSub) acceptHandler(c *ZConn) { +func (s *ZSub) handlerConn(c *ZConn) { defer func() { if r := recover(); r != nil { - log.Println("acceptHandler Recovered:", r) + log.Println("handlerConn Recovered:", r) } log.Println("conn closed:", (*c.conn).RemoteAddr(), "[", c.sn, "]") }() defer func() { // conn remove to conns funChan <- func() { - zsub.conns = c.removeTo(zsub.conns) + Hub.conns = c.removeTo(Hub.conns) } // close ZConn @@ -312,7 +310,7 @@ func (s *ZSub) acceptHandler(c *ZConn) { // conn add to conns funChan <- func() { - zsub.conns = c.appendTo(zsub.conns) + Hub.conns = c.appendTo(Hub.conns) } reader := bufio.NewReader(*c.conn) @@ -358,7 +356,7 @@ func (s *ZSub) acceptHandler(c *ZConn) { continue } - msgAccept(Message{Conn: c, Rcmd: rcmd}) + handleMessage(Message{Conn: c, Rcmd: rcmd}) } } @@ -458,14 +456,14 @@ func (s *ZSub) _unlock(l Lock) { } func (s *ZSub) shutdown() { - s.dataStorage() + s.SaveData() os.Exit(0) } func Info() map[string]interface{} { // topics topics := map[string]interface{}{} - for s, topic := range zsub.topics { + for s, topic := range Hub.topics { // {groups:[{name:xxx,size:xx}]} arr := make([]map[string]interface{}, 0) @@ -482,7 +480,7 @@ func Info() map[string]interface{} { // conns conns := make([]interface{}, 0) - for _, c := range zsub.conns { + for _, c := range Hub.conns { m := make(map[string]interface{}, 0) m["remoteaddr"] = (*c.conn).RemoteAddr() m["groupid"] = c.groupid @@ -495,9 +493,9 @@ func Info() map[string]interface{} { info := map[string]interface{}{ "topics": topics, "topicsize": len(topics), - "timersize": len(zsub.timers), + "timersize": len(Hub.timers), "conns": conns, - "connsize": len(zsub.conns), + "connsize": len(Hub.conns), } return info } diff --git a/zsub/ztimer.go b/internal/zsub/ztimer.go similarity index 62% rename from zsub/ztimer.go rename to internal/zsub/ztimer.go index 79c4962..534e36d 100644 --- a/zsub/ztimer.go +++ b/internal/zsub/ztimer.go @@ -13,16 +13,74 @@ import ( ) type ZTimer struct { - conns []*ZConn - expr string - topic string - cron *cron.Cron - ticker *time.Ticker - single bool + Conns []*ZConn + Expr string + Topic string + Cron *cron.Cron + Ticker *time.Ticker + Single bool +} + +type ZDelay struct { + Topic string + Value string + Exectime time.Time + Timer *time.Timer +} + +// delay topic value 100 -> publish topic value +func (s *ZSub) delay(rcmd []string, c *ZConn) { + s.Lock() + defer func() { + s.Unlock() + // s.SaveData() + s.delayup = true + }() + if len(rcmd) != 4 { + c.send("-Error: subscribe para number!") + return + } + + t, err := strconv.ParseInt(rcmd[3], 10, 64) + if err != nil { + c.send("-Error: " + strings.Join(rcmd, " ")) + return + } + + delay := s.delays[rcmd[1]+"-"+rcmd[2]] + if delay != nil { + if t < 0 { + delay.Timer.Stop() + delete(s.delays, rcmd[1]+"-"+rcmd[2]) + return + } + delay.Timer.Reset(time.Duration(t) * time.Millisecond) + } else { + if t < 0 { + return + } + delay := &ZDelay{ + Topic: rcmd[1], + Value: rcmd[2], + Exectime: time.Now().Add(time.Duration(t) * time.Millisecond), + Timer: time.NewTimer(time.Duration(t) * time.Millisecond), + } + s.delays[rcmd[1]+"-"+rcmd[2]] = delay + go func() { + select { + case <-delay.Timer.C: + log.Println("delay send:", rcmd[1], rcmd[2]) + Hub.Publish(rcmd[1], rcmd[2]) + funChan <- func() { + delete(s.delays, rcmd[1]+"-"+rcmd[2]) + } + } + }() + } } /* -["timer", topic, expr, a|x] +["Timer", Topic, expr, 0|1] */ func (s *ZSub) timer(rcmd []string, c *ZConn) { s.Lock() @@ -30,36 +88,36 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { timer := s.timers[rcmd[1]] if timer == nil { timer = &ZTimer{ - conns: []*ZConn{}, - topic: rcmd[1], + Conns: []*ZConn{}, + Topic: rcmd[1], } s.timers[rcmd[1]] = timer } if c != nil { - timer.conns = c.appendTo(timer.conns) + timer.Conns = c.appendTo(timer.Conns) } - if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) { - timer.expr = rcmd[2] - if timer.cron != nil { - timer.cron.Stop() + if len(rcmd) == 4 && !strings.EqualFold(timer.Expr, rcmd[2]) { + timer.Expr = rcmd[2] + if timer.Cron != nil { + timer.Cron.Stop() } - if timer.ticker != nil { - timer.ticker.Stop() + if timer.Ticker != nil { + timer.Ticker.Stop() } var timerFun = func() { - for _, conn := range timer.conns { - log.Println("timer send:", timer.topic) - err := conn.send("timer", timer.topic) - if timer.single && err == nil { + for _, conn := range timer.Conns { + log.Println("Timer send:", timer.Topic) + err := conn.send("Timer", timer.Topic) + if timer.Single && err == nil { break } } } r, _ := regexp.Compile("^\\d+[d,H,m,s]$") - expr := timer.expr + expr := timer.Expr if r.MatchString(expr) { n, _ := strconv.Atoi(expr[:len(expr)-1]) _n := time.Duration(n) @@ -75,25 +133,25 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { ticker = time.NewTicker(_n * time.Second) } - timer.ticker = ticker + timer.Ticker = ticker go func() { for range ticker.C { timerFun() } }() } else { - timer.cron = func() *cron.Cron { + timer.Cron = func() *cron.Cron { c := cron.New() - c.AddFunc(timer.expr, timerFun) + c.AddFunc(timer.Expr, timerFun) go c.Run() return c }() } - //timer.configSave() + //Timer.configSave() } - if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.single) { - timer.single = strings.EqualFold("a", rcmd[3]) - //timer.configSave() + if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.Single) { + timer.Single = strings.EqualFold("a", rcmd[3]) + //Timer.configSave() } } @@ -103,10 +161,6 @@ func (s *ZSub) ReloadTimer() { Conf.Ztimer.Db.Password, Conf.Ztimer.Db.Addr, Conf.Ztimer.Db.Database, - /*GetStr("ztimer.db.user", "root"), - GetStr("ztimer.db.pwd", "123456"), - GetStr("ztimer.db.addr", "127.0.0.1:3306"), - GetStr("ztimer.db.database", "zhub"),*/ )) if err != nil { @@ -125,66 +179,6 @@ func (s *ZSub) ReloadTimer() { var expr string var single string rows.Scan(&name, &expr, &single) - s.timer([]string{"timer", name, expr, single}, nil) //["timer", topic, expr, a|x] - } -} - -// ================== delay ===================================== - -type ZDelay struct { - topic string - value string - exectime time.Time - timer *time.Timer -} - -// delay topic value 100 -> publish topic value -func (s *ZSub) delay(rcmd []string, c *ZConn) { - s.Lock() - defer func() { - s.Unlock() - // s.dataStorage() - s.delayup = true - }() - if len(rcmd) != 4 { - c.send("-Error: subscribe para number!") - return - } - - t, err := strconv.ParseInt(rcmd[3], 10, 64) - if err != nil { - c.send("-Error: " + strings.Join(rcmd, " ")) - return - } - - delay := s.delays[rcmd[1]+"-"+rcmd[2]] - if delay != nil { - if t < 0 { - delay.timer.Stop() - delete(s.delays, rcmd[1]+"-"+rcmd[2]) - return - } - delay.timer.Reset(time.Duration(t) * time.Millisecond) - } else { - if t < 0 { - return - } - delay := &ZDelay{ - topic: rcmd[1], - value: rcmd[2], - exectime: time.Now().Add(time.Duration(t) * time.Millisecond), - timer: time.NewTimer(time.Duration(t) * time.Millisecond), - } - s.delays[rcmd[1]+"-"+rcmd[2]] = delay - go func() { - select { - case <-delay.timer.C: - log.Println("delay send:", rcmd[1], rcmd[2]) - zsub.Publish(rcmd[1], rcmd[2]) - funChan <- func() { - delete(s.delays, rcmd[1]+"-"+rcmd[2]) - } - } - }() + s.timer([]string{"Timer", name, expr, single}, nil) //["Timer", Topic, expr, a|x] } } diff --git a/zsub/ztopic.go b/internal/zsub/ztopic.go similarity index 99% rename from zsub/ztopic.go rename to internal/zsub/ztopic.go index 023e8b1..24ba35d 100644 --- a/zsub/ztopic.go +++ b/internal/zsub/ztopic.go @@ -34,5 +34,3 @@ func (t *ZTopic) init() { } }() } - -// diff --git a/main.go b/main.go index ac82b5c..6e41be7 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,8 @@ import ( "log" "zhub/cmd" "zhub/internal/config" - "zhub/zsub" + "zhub/internal/monitor" + "zhub/internal/zsub" ) func main() { @@ -19,6 +20,15 @@ func main() { addr := conf.Service.Addr // 获取服务地址 config.InitLog(conf.Log) // 初始化日志配置 + { + /* + 使用环境变量覆盖 配置文件参数 TODO + port, err := strconv.Atoi(os.Getenv("PORT")) + if err != nil { + port = 6066 + }*/ + } + if rcmd != "" { // 如果指定了客户端命令 auth := "" // 认证信息 for key, value := range conf.Auth { // 遍历找到一个认证信息 @@ -42,7 +52,7 @@ func main() { if isCliMode { cmd.ClientRun(addr) // 客户端运行 } else { - go zsub.StartWatch() // 启动监控协程 + go monitor.StartWatch() // 启动监控协程 zsub.StartServer(addr, conf) // 启动服务进程 } }