From f7360c45d89bbab5e1d067c15cebebd6ac32899f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=98=BE=E4=BC=98?= <237809796@qq.com> Date: Sat, 22 Jul 2023 02:48:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9:=20=E5=8C=85=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E4=B8=BA=20zbus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- internal/monitor/monitor.go | 16 +++---- internal/{zsub => zbus}/msg-accept.go | 24 +++++------ internal/{zsub => zbus}/zdb.go | 10 ++--- internal/{zsub => zbus}/zgroup.go | 2 +- internal/{zsub => zbus}/zsub.go | 60 +++++++++++++-------------- internal/{zsub => zbus}/ztimer.go | 10 ++--- internal/{zsub => zbus}/ztopic.go | 2 +- main.go | 6 +-- 9 files changed, 66 insertions(+), 66 deletions(-) rename internal/{zsub => zbus}/msg-accept.go (91%) rename internal/{zsub => zbus}/zdb.go (95%) rename internal/{zsub => zbus}/zgroup.go (98%) rename internal/{zsub => zbus}/zsub.go (91%) rename internal/{zsub => zbus}/ztimer.go (95%) rename internal/{zsub => zbus}/ztopic.go (97%) diff --git a/go.mod b/go.mod index 4d9aa9e..f60bb61 100644 --- a/go.mod +++ b/go.mod @@ -51,5 +51,5 @@ replace ( zhub/cmd => ./zhub/cmd zhub/internal/config => ./zhub/internal/config zhub/internal/monitor => ./zhub/internal/monitor - zhub/internal/zsub => ./zhub/internal/zsub + zhub/internal/zbus => ./zhub/internal/zbus ) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index cf9a503..6dc40e8 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -3,7 +3,7 @@ package monitor import ( "github.com/gin-gonic/gin" "net/http" - "zhub/internal/zsub" + "zhub/internal/zbus" ) func init() { @@ -22,22 +22,22 @@ func StartWatch() { }) r.GET("/info", func(c *gin.Context) { - c.JSON(http.StatusOK, zsub.Info()) + c.JSON(http.StatusOK, zbus.Info()) }) r.GET("/cleanup", func(c *gin.Context) { - zsub.Hub.Clearup() + zbus.Bus.Clearup() c.JSON(http.StatusOK, "+OK") }) r.GET("/timer/reload", func(c *gin.Context) { - zsub.Hub.ReloadTimer() + zbus.Bus.ReloadTimer() c.JSON(http.StatusOK, "+reload timer ok") }) r.GET("/topic/publish", func(c *gin.Context) { topic := c.Query("topic") value := c.Query("value") - zsub.Hub.Publish(topic, value) + zbus.Bus.Publish(topic, value) c.JSON(http.StatusOK, "+OK") }) r.GET("/topic/delay", func(c *gin.Context) { @@ -45,16 +45,16 @@ func StartWatch() { value := c.Query("value") delay := c.Query("delay") - zsub.Hub.Delay([]string{"delay", topic, value, delay}) + zbus.Bus.Delay([]string{"delay", topic, value, delay}) c.JSON(http.StatusOK, "+OK") }) // reload the auth configuration r.GET("/auth/reload", func(c *gin.Context) { - zsub.AuthManager.Reload() + zbus.AuthManager.Reload() c.JSON(http.StatusOK, "+OK") }) - watchAddr := zsub.Conf.Service.Watch + watchAddr := zbus.Conf.Service.Watch r.Run(watchAddr) } diff --git a/internal/zsub/msg-accept.go b/internal/zbus/msg-accept.go similarity index 91% rename from internal/zsub/msg-accept.go rename to internal/zbus/msg-accept.go index d813d59..581c1e4 100644 --- a/internal/zsub/msg-accept.go +++ b/internal/zbus/msg-accept.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "encoding/json" @@ -118,13 +118,13 @@ func handleMessage(v Message) { return case "rpc": // if rpc and no sub back error - if Hub.noSubscribe(rcmd[1]) { + if Bus.noSubscribe(rcmd[1]) { rpcBody := make(map[string]string) json.Unmarshal([]byte(rcmd[2]), &rpcBody) log.Println("rpc no subscribe: ", rcmd[1]) ruk := rpcBody["ruk"] - Hub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") + Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") return } @@ -134,7 +134,7 @@ func handleMessage(v Message) { /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd }*/ - Hub.Publish(rcmd[1], rcmd[2]) + Bus.Publish(rcmd[1], rcmd[2]) } return case "publish": @@ -144,13 +144,13 @@ func handleMessage(v Message) { /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd }*/ - Hub.Publish(rcmd[1], rcmd[2]) + Bus.Publish(rcmd[1], rcmd[2]) } return case "broadcast": - Hub.broadcast(rcmd[1], rcmd[2]) + Bus.broadcast(rcmd[1], rcmd[2]) case "delay": - Hub.Delay(rcmd) + Bus.Delay(rcmd) default: } @@ -178,7 +178,7 @@ func handleMessage(v Message) { } case "timer": for _, name := range rcmd[1:] { - Hub.timer([]string{"timer", name}, c) // append to timers + Bus.timer([]string{"timer", name}, c) // append to timers c.timers = append(c.timers, name) // append to conns } case "cmd": @@ -187,12 +187,12 @@ func handleMessage(v Message) { } switch rcmd[1] { case "reload-timer": - Hub.ReloadTimer() + Bus.ReloadTimer() case "shutdown": if AuthManager.IsAdmin(c.user) { return } - Hub.shutdown() + Bus.shutdown() } case "lock": // lock key uuid 5 @@ -201,14 +201,14 @@ func handleMessage(v Message) { return } d, _ := strconv.Atoi(rcmd[3]) - Hub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) + Bus._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) case "unlock": // unlock key uuid if len(rcmd) != 3 { c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]") return } - Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) + Bus._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) default: c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") return diff --git a/internal/zsub/zdb.go b/internal/zbus/zdb.go similarity index 95% rename from internal/zsub/zdb.go rename to internal/zbus/zdb.go index 002871d..55f61fc 100644 --- a/internal/zsub/zdb.go +++ b/internal/zbus/zdb.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "bufio" @@ -34,7 +34,7 @@ func Append(str string, fileName string) { } } -func (s *ZSub) SaveData() { +func (s *ZBus) SaveData() { defer func() { if r := recover(); r != nil { log.Println("SaveData Recovered:", r) @@ -89,12 +89,12 @@ func (s *ZSub) SaveData() { }() } -func (s *ZSub) LoadData() { +func (s *ZBus) LoadData() { s.loadDelay() // s.loadLock() } -func (s *ZSub) loadDelay() { +func (s *ZBus) loadDelay() { f, err := os.Open(datadir + "/delay.z") if err != nil { return @@ -129,7 +129,7 @@ func (s *ZSub) loadDelay() { } } -func (s *ZSub) loadLock() { +func (s *ZBus) loadLock() { f, err := os.Open(datadir + "/lock.z") if err != nil { return diff --git a/internal/zsub/zgroup.go b/internal/zbus/zgroup.go similarity index 98% rename from internal/zsub/zgroup.go rename to internal/zbus/zgroup.go index 3cc87ae..2c0fda2 100644 --- a/internal/zsub/zgroup.go +++ b/internal/zbus/zgroup.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "log" diff --git a/internal/zsub/zsub.go b/internal/zbus/zsub.go similarity index 91% rename from internal/zsub/zsub.go rename to internal/zbus/zsub.go index 8f9c03a..c3f5d37 100644 --- a/internal/zsub/zsub.go +++ b/internal/zbus/zsub.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "bufio" @@ -17,7 +17,7 @@ import ( var ( Conf config.Config - Hub = &ZSub{ + Bus = &ZBus{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), delays: make(map[string]*ZDelay), @@ -41,7 +41,7 @@ func init() { } }() conns := make([]*ZConn, 0) // 需要关闭的连接 - for _, c := range Hub.conns { + for _, c := range Bus.conns { if c.ping > 0 && c.ping-c.pong > 19 { conns = c.appendTo(conns) continue @@ -63,12 +63,12 @@ func init() { } - Hub.SaveData() + Bus.SaveData() } }() } -type ZSub struct { +type ZBus struct { sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer @@ -119,9 +119,9 @@ func NewZConn(conn *net.Conn) *ZConn { 3、若有待消费消息启动消费 */ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} - Hub.Lock() - defer Hub.Unlock() - ztopic := Hub.topics[topic] //ZTopic + Bus.Lock() + defer Bus.Unlock() + ztopic := Bus.topics[topic] //ZTopic if ztopic == nil { ztopic = &ZTopic{ groups: map[string]*ZGroup{}, @@ -129,7 +129,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} chMsg: make(chan string, 500), } ztopic.init() - Hub.topics[topic] = ztopic + Bus.topics[topic] = ztopic } zgroup := ztopic.groups[c.groupid] //ZGroup @@ -159,7 +159,7 @@ func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{} c.Lock() defer c.Unlock() close(c.substoped[topic]) - ztopic := Hub.topics[topic] //ZTopic + ztopic := Bus.topics[topic] //ZTopic if ztopic == nil { return } @@ -205,10 +205,10 @@ func (c *ZConn) close() { } // timer conn close - Hub.Lock() - defer Hub.Unlock() + Bus.Lock() + defer Bus.Unlock() for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 - timer := Hub.timers[topic] + timer := Bus.timers[topic] if timer == nil { continue } @@ -267,8 +267,8 @@ func StartServer(addr string, conf config.Config) { }() // 重新加载[定时、延时] - go Hub.ReloadTimer() - go Hub.LoadData() + go Bus.ReloadTimer() + go Bus.LoadData() // 启动服务监听 listen, err := net.Listen("tcp", addr) @@ -286,12 +286,12 @@ func StartServer(addr string, conf config.Config) { zConn := NewZConn(&conn) log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn) - go Hub.handlerConn(zConn) + go Bus.handlerConn(zConn) } } // 连接处理 -func (s *ZSub) handlerConn(c *ZConn) { +func (s *ZBus) handlerConn(c *ZConn) { defer func() { if r := recover(); r != nil { log.Println("handlerConn Recovered:", r) @@ -301,7 +301,7 @@ func (s *ZSub) handlerConn(c *ZConn) { defer func() { // conn remove to conns funChan <- func() { - Hub.conns = c.removeTo(Hub.conns) + Bus.conns = c.removeTo(Bus.conns) } // close ZConn @@ -310,7 +310,7 @@ func (s *ZSub) handlerConn(c *ZConn) { // conn add to conns funChan <- func() { - Hub.conns = c.appendTo(Hub.conns) + Bus.conns = c.appendTo(Bus.conns) } reader := bufio.NewReader(*c.conn) @@ -365,7 +365,7 @@ Publish topic message 1、send message to topic's chan 2、feedback send success to sender, and sending message to topic's subscripts */ -func (s *ZSub) Publish(topic, msg string) { +func (s *ZBus) Publish(topic, msg string) { s.RLock() defer s.RUnlock() ztopic := s.topics[topic] //ZTopic @@ -386,7 +386,7 @@ func (s *ZSub) Publish(topic, msg string) { /* send broadcast message */ -func (s *ZSub) broadcast(topic, msg string) { +func (s *ZBus) broadcast(topic, msg string) { s.RLock() defer s.RUnlock() if strings.EqualFold(topic, "lock") { @@ -409,7 +409,7 @@ func (s *ZSub) broadcast(topic, msg string) { lock: lock key uuid t unlock: unlock key uuid */ -func (s *ZSub) _lock(lock *Lock) { +func (s *ZBus) _lock(lock *Lock) { locks := s.locks[lock.key] if locks == nil { locks = make([]*Lock, 0) @@ -432,7 +432,7 @@ func (s *ZSub) _lock(lock *Lock) { s.locks[lock.key] = append(locks, lock) } } -func (s *ZSub) _unlock(l Lock) { +func (s *ZBus) _unlock(l Lock) { locks := s.locks[l.key] if locks == nil || len(locks) == 0 { return @@ -455,7 +455,7 @@ func (s *ZSub) _unlock(l Lock) { } } -func (s *ZSub) shutdown() { +func (s *ZBus) shutdown() { s.SaveData() os.Exit(0) } @@ -463,7 +463,7 @@ func (s *ZSub) shutdown() { func Info() map[string]interface{} { // topics topics := map[string]interface{}{} - for s, topic := range Hub.topics { + for s, topic := range Bus.topics { // {groups:[{name:xxx,size:xx}]} arr := make([]map[string]interface{}, 0) @@ -480,7 +480,7 @@ func Info() map[string]interface{} { // conns conns := make([]interface{}, 0) - for _, c := range Hub.conns { + for _, c := range Bus.conns { m := make(map[string]interface{}, 0) m["remoteaddr"] = (*c.conn).RemoteAddr() m["groupid"] = c.groupid @@ -493,14 +493,14 @@ func Info() map[string]interface{} { info := map[string]interface{}{ "topics": topics, "topicsize": len(topics), - "timersize": len(Hub.timers), + "timersize": len(Bus.timers), "conns": conns, - "connsize": len(Hub.conns), + "connsize": len(Bus.conns), } return info } -func (s *ZSub) Clearup() { +func (s *ZBus) Clearup() { for tn, topic := range s.topics { for _, group := range topic.groups { if len(group.conns) > 0 || topic.mcount > group.offset { @@ -513,7 +513,7 @@ func (s *ZSub) Clearup() { } } -func (s *ZSub) noSubscribe(topic string) bool { +func (s *ZBus) noSubscribe(topic string) bool { zTopic := s.topics[topic] if zTopic == nil || len(zTopic.groups) == 0 { return true diff --git a/internal/zsub/ztimer.go b/internal/zbus/ztimer.go similarity index 95% rename from internal/zsub/ztimer.go rename to internal/zbus/ztimer.go index 09500c0..2050d92 100644 --- a/internal/zsub/ztimer.go +++ b/internal/zbus/ztimer.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "database/sql" @@ -29,7 +29,7 @@ type ZDelay struct { } // Delay : delay topic value 100 -> publish topic value -func (s *ZSub) Delay(rcmd []string) { +func (s *ZBus) Delay(rcmd []string) { s.Lock() defer func() { s.Unlock() @@ -70,7 +70,7 @@ func (s *ZSub) Delay(rcmd []string) { select { case <-delay.Timer.C: log.Println("delay send:", rcmd[1], rcmd[2]) - Hub.Publish(rcmd[1], rcmd[2]) + Bus.Publish(rcmd[1], rcmd[2]) funChan <- func() { delete(s.delays, rcmd[1]+"-"+rcmd[2]) } @@ -82,7 +82,7 @@ func (s *ZSub) Delay(rcmd []string) { /* ["Timer", Topic, expr, 0|1] */ -func (s *ZSub) timer(rcmd []string, c *ZConn) { +func (s *ZBus) timer(rcmd []string, c *ZConn) { s.Lock() defer s.Unlock() timer := s.timers[rcmd[1]] @@ -155,7 +155,7 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { } } -func (s *ZSub) ReloadTimer() { +func (s *ZBus) ReloadTimer() { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", Conf.Ztimer.Db.User, Conf.Ztimer.Db.Password, diff --git a/internal/zsub/ztopic.go b/internal/zbus/ztopic.go similarity index 97% rename from internal/zsub/ztopic.go rename to internal/zbus/ztopic.go index 24ba35d..596f0a3 100644 --- a/internal/zsub/ztopic.go +++ b/internal/zbus/ztopic.go @@ -1,4 +1,4 @@ -package zsub +package zbus import ( "fmt" diff --git a/main.go b/main.go index e78abed..028064c 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "zhub/cmd" "zhub/internal/config" "zhub/internal/monitor" - "zhub/internal/zsub" + "zhub/internal/zbus" ) func main() { @@ -30,7 +30,7 @@ func main() { } if rcmd != "" { // 如果指定了客户端命令 - adminToken, err := zsub.AuthManager.AdminToken() // 认证信息 + adminToken, err := zbus.AuthManager.AdminToken() // 认证信息 if err != nil { log.Fatal(err) // Configuration error, stop the client from running. return @@ -54,6 +54,6 @@ func main() { cmd.ClientRun(addr) // 客户端运行 } else { go monitor.StartWatch() // 启动监控协程 - zsub.StartServer(addr, conf) // 启动服务进程 + zbus.StartServer(addr, conf) // 启动服务进程 } }