diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 6dc40e8..7f8aa8b 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -6,6 +6,8 @@ import ( "zhub/internal/zbus" ) +var r = gin.Default() + func init() { // 1.日志文件 定期分割归档 @@ -13,18 +15,14 @@ func init() { func StartWatch() { - r := gin.Default() - - r.Group("/users") - r.GET("/", func(c *gin.Context) { c.File("./public/index.html") }) - r.GET("/info", func(c *gin.Context) { + r.GET("/_/info", func(c *gin.Context) { c.JSON(http.StatusOK, zbus.Info()) }) - r.GET("/cleanup", func(c *gin.Context) { + r.GET("/_/cleanup", func(c *gin.Context) { zbus.Bus.Clearup() c.JSON(http.StatusOK, "+OK") }) diff --git a/internal/zbus/msg-accept.go b/internal/zbus/zbus-message-handler.go similarity index 89% rename from internal/zbus/msg-accept.go rename to internal/zbus/zbus-message-handler.go index 581c1e4..d3fe093 100644 --- a/internal/zbus/msg-accept.go +++ b/internal/zbus/zbus-message-handler.go @@ -22,10 +22,10 @@ func init() { var funChan = make(chan func(), 1000) -func handleMessage(v Message) { +func messageHandler(v Message) { defer func() { if r := recover(); r != nil { - log.Println("handleMessage Recovered:", r) + log.Println("messageHandler Recovered:", r) } }() c := v.Conn @@ -47,7 +47,7 @@ func handleMessage(v Message) { // 准入拦截,所有指令完成 auth 认证后才可进入 if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" { - c.send("-Auth: NOAUTH Authentication required:" + rcmd[0]) + c.send("-Auth: Authentication required [" + rcmd[0] + "]") return } // 指令预处理 @@ -81,6 +81,14 @@ func handleMessage(v Message) { case "publish", "broadcast", "delay", "rpc": if !AuthManager.AuthCheck(c.user, rcmd[1], "w") { c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.") + log.Printf("[%d] -Auth: %s [%s]\n", c.sn, cmd, rcmd[1]) + if cmd == "rpc" { + rpcBody := make(map[string]string) + json.Unmarshal([]byte(rcmd[2]), &rpcBody) + + ruk := rpcBody["ruk"] + Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 401, 'retinfo': 'unauthorized!', 'ruk': '"+ruk+"'}") + } return } case "subscribe": // 在订阅逻辑处检查 @@ -121,7 +129,7 @@ func handleMessage(v Message) { if Bus.noSubscribe(rcmd[1]) { rpcBody := make(map[string]string) json.Unmarshal([]byte(rcmd[2]), &rpcBody) - log.Println("rpc no subscribe: ", rcmd[1]) + log.Printf("[%d] : rpc %s no subscribe", c.sn, rcmd[1]) ruk := rpcBody["ruk"] Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") @@ -168,6 +176,7 @@ func handleMessage(v Message) { // auth check if !AuthManager.AuthCheck(c.user, rcmd[1], "r") { c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.") + log.Printf("-Auth: %s [%s]\n", cmd, rcmd[1]) continue } c.subscribe(topic) diff --git a/internal/zbus/zsub.go b/internal/zbus/zbus.go similarity index 94% rename from internal/zbus/zsub.go rename to internal/zbus/zbus.go index c3f5d37..b0808b4 100644 --- a/internal/zbus/zsub.go +++ b/internal/zbus/zbus.go @@ -23,8 +23,9 @@ var ( delays: make(map[string]*ZDelay), locks: make(map[string][]*Lock), conns: make([]*ZConn, 0), + sn: 1000, } - SN int32 = 1000 + //SN int32 = 1000 ) func init() { @@ -70,12 +71,13 @@ func init() { type ZBus struct { sync.RWMutex - topics map[string]*ZTopic - timers map[string]*ZTimer - delays map[string]*ZDelay - locks map[string][]*Lock - conns []*ZConn - delayup bool + topics map[string]*ZTopic // 订阅主题 + timers map[string]*ZTimer // 定时事件 + delays map[string]*ZDelay // 延时消息 + locks map[string][]*Lock // 当前锁对象 + conns []*ZConn // 所有的客户端连接 + sn int32 // 客户端连接编号 + delayup bool // 是否需要延时持久保存数据 } type ZConn struct { //ZConn @@ -103,7 +105,7 @@ type Lock struct { func NewZConn(conn *net.Conn) *ZConn { return &ZConn{ - sn: atomic.AddInt32(&SN, 1), // 连接编号 + sn: atomic.AddInt32(&Bus.sn, 1), conn: conn, topics: []string{}, timers: []string{}, @@ -356,7 +358,7 @@ func (s *ZBus) handlerConn(c *ZConn) { continue } - handleMessage(Message{Conn: c, Rcmd: rcmd}) + messageHandler(Message{Conn: c, Rcmd: rcmd}) } } @@ -406,8 +408,9 @@ func (s *ZBus) broadcast(topic, msg string) { } /* -lock: lock key uuid t -unlock: unlock key uuid +lock: lock key uuid t +tryLock: trylock key uuid t +unlock: unlock key uuid */ func (s *ZBus) _lock(lock *Lock) { locks := s.locks[lock.key]