修改:1、服务信息查看地址 2、ZBus对象结构新加入客户端连接编号属性

This commit is contained in:
2023-10-21 08:56:58 +08:00
parent f7360c45d8
commit 4fc7121b28
3 changed files with 31 additions and 21 deletions

View File

@@ -6,6 +6,8 @@ import (
"zhub/internal/zbus" "zhub/internal/zbus"
) )
var r = gin.Default()
func init() { func init() {
// 1.日志文件 定期分割归档 // 1.日志文件 定期分割归档
@@ -13,18 +15,14 @@ func init() {
func StartWatch() { func StartWatch() {
r := gin.Default()
r.Group("/users")
r.GET("/", func(c *gin.Context) { r.GET("/", func(c *gin.Context) {
c.File("./public/index.html") c.File("./public/index.html")
}) })
r.GET("/info", func(c *gin.Context) { r.GET("/_/info", func(c *gin.Context) {
c.JSON(http.StatusOK, zbus.Info()) c.JSON(http.StatusOK, zbus.Info())
}) })
r.GET("/cleanup", func(c *gin.Context) { r.GET("/_/cleanup", func(c *gin.Context) {
zbus.Bus.Clearup() zbus.Bus.Clearup()
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })

View File

@@ -22,10 +22,10 @@ func init() {
var funChan = make(chan func(), 1000) var funChan = make(chan func(), 1000)
func handleMessage(v Message) { func messageHandler(v Message) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("handleMessage Recovered:", r) log.Println("messageHandler Recovered:", r)
} }
}() }()
c := v.Conn c := v.Conn
@@ -47,7 +47,7 @@ func handleMessage(v Message) {
// 准入拦截,所有指令完成 auth 认证后才可进入 // 准入拦截,所有指令完成 auth 认证后才可进入
if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" { if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" {
c.send("-Auth: NOAUTH Authentication required:" + rcmd[0]) c.send("-Auth: Authentication required [" + rcmd[0] + "]")
return return
} }
// 指令预处理 // 指令预处理
@@ -81,6 +81,14 @@ func handleMessage(v Message) {
case "publish", "broadcast", "delay", "rpc": case "publish", "broadcast", "delay", "rpc":
if !AuthManager.AuthCheck(c.user, rcmd[1], "w") { if !AuthManager.AuthCheck(c.user, rcmd[1], "w") {
c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.") c.send("-Error: Insufficient permissions to send " + cmd + " [" + rcmd[1] + "] message.")
log.Printf("[%d] -Auth: %s [%s]\n", c.sn, cmd, rcmd[1])
if cmd == "rpc" {
rpcBody := make(map[string]string)
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
ruk := rpcBody["ruk"]
Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 401, 'retinfo': 'unauthorized', 'ruk': '"+ruk+"'}")
}
return return
} }
case "subscribe": // 在订阅逻辑处检查 case "subscribe": // 在订阅逻辑处检查
@@ -121,7 +129,7 @@ func handleMessage(v Message) {
if Bus.noSubscribe(rcmd[1]) { if Bus.noSubscribe(rcmd[1]) {
rpcBody := make(map[string]string) rpcBody := make(map[string]string)
json.Unmarshal([]byte(rcmd[2]), &rpcBody) json.Unmarshal([]byte(rcmd[2]), &rpcBody)
log.Println("rpc no subscribe: ", rcmd[1]) log.Printf("[%d] : rpc %s no subscribe", c.sn, rcmd[1])
ruk := rpcBody["ruk"] ruk := rpcBody["ruk"]
Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") Bus.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}")
@@ -168,6 +176,7 @@ func handleMessage(v Message) {
// auth check // auth check
if !AuthManager.AuthCheck(c.user, rcmd[1], "r") { if !AuthManager.AuthCheck(c.user, rcmd[1], "r") {
c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.") c.send("-Error: Insufficient permissions to " + cmd + " [" + rcmd[1] + "] message.")
log.Printf("-Auth: %s [%s]\n", cmd, rcmd[1])
continue continue
} }
c.subscribe(topic) c.subscribe(topic)

View File

@@ -23,8 +23,9 @@ var (
delays: make(map[string]*ZDelay), delays: make(map[string]*ZDelay),
locks: make(map[string][]*Lock), locks: make(map[string][]*Lock),
conns: make([]*ZConn, 0), conns: make([]*ZConn, 0),
sn: 1000,
} }
SN int32 = 1000 //SN int32 = 1000
) )
func init() { func init() {
@@ -70,12 +71,13 @@ func init() {
type ZBus struct { type ZBus struct {
sync.RWMutex sync.RWMutex
topics map[string]*ZTopic topics map[string]*ZTopic // 订阅主题
timers map[string]*ZTimer timers map[string]*ZTimer // 定时事件
delays map[string]*ZDelay delays map[string]*ZDelay // 延时消息
locks map[string][]*Lock locks map[string][]*Lock // 当前锁对象
conns []*ZConn conns []*ZConn // 所有的客户端连接
delayup bool sn int32 // 客户端连接编号
delayup bool // 是否需要延时持久保存数据
} }
type ZConn struct { //ZConn type ZConn struct { //ZConn
@@ -103,7 +105,7 @@ type Lock struct {
func NewZConn(conn *net.Conn) *ZConn { func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{ return &ZConn{
sn: atomic.AddInt32(&SN, 1), // 连接编号 sn: atomic.AddInt32(&Bus.sn, 1),
conn: conn, conn: conn,
topics: []string{}, topics: []string{},
timers: []string{}, timers: []string{},
@@ -356,7 +358,7 @@ func (s *ZBus) handlerConn(c *ZConn) {
continue continue
} }
handleMessage(Message{Conn: c, Rcmd: rcmd}) messageHandler(Message{Conn: c, Rcmd: rcmd})
} }
} }
@@ -406,8 +408,9 @@ func (s *ZBus) broadcast(topic, msg string) {
} }
/* /*
lock: lock key uuid t lock: lock key uuid t
unlock: unlock key uuid tryLock: trylock key uuid t
unlock: unlock key uuid
*/ */
func (s *ZBus) _lock(lock *Lock) { func (s *ZBus) _lock(lock *Lock) {
locks := s.locks[lock.key] locks := s.locks[lock.key]