From bf82800afc09cc6d55c5666041b3b572c13e3c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=98=BE=E4=BC=98?= <237809796@qq.com> Date: Sun, 16 Jul 2023 01:33:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E9=A2=91=E9=81=93?= =?UTF-8?q?=EF=BC=88topic=EF=BC=89=E7=BB=86=E7=B2=92=E5=BA=A6=E6=9D=83?= =?UTF-8?q?=E9=99=90=E6=8E=A7=E5=88=B6=EF=BC=8C=E6=98=AF=E5=90=A6=E5=85=81?= =?UTF-8?q?=E8=AE=B8=E8=AE=A2=E9=98=85=E6=B6=88=E6=81=AF=E3=80=81=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E5=85=81=E8=AE=B8=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=88=B0=E9=A2=91=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.ini | 4 - auth.yml | 73 ++++++++++++ internal/auth/auth.go | 225 ++++++++++++++++++++++++++++++++++++ internal/auth/auth_test.go | 41 +++++++ internal/monitor/monitor.go | 9 +- internal/zsub/msg-accept.go | 100 +++++++++------- internal/zsub/zsub.go | 4 +- main.go | 11 +- 8 files changed, 412 insertions(+), 55 deletions(-) create mode 100644 auth.yml create mode 100644 internal/auth/auth.go create mode 100644 internal/auth/auth_test.go diff --git a/app.ini b/app.ini index 02d7b8d..a3fc0ed 100644 --- a/app.ini +++ b/app.ini @@ -16,7 +16,3 @@ db.addr=127.0.0.1:3306 db.user=root db.password=123456 db.database=zhub - -[auth] -admin=123456 -lxy=123456 diff --git a/auth.yml b/auth.yml new file mode 100644 index 0000000..d3d60f3 --- /dev/null +++ b/auth.yml @@ -0,0 +1,73 @@ +# 下面是一个示例的 YAML 数据格式,用于存储用户、用户组、Token、频道和授权信息: +# +# users:存储用户信息,包括用户ID、用户名、密码和所属用户组。 +# groups:存储用户组信息,包括用户组名称和描述。 +# tokens:存储Token信息,包括Token ID、用户ID、Token值和可访问的频道。 +# channels:存储频道信息,包括频道名称、描述和是否为公开频道。 +# ------------------------------------------------------------------- + +# 下列示例中使用了上面的数据结构: + +# 用户信息 +users: + - id: 1 + username: admin + password: Admin12345 + status: active + groups: + - admin + # 正则 或 channel name + reads: + - wx:user-follow # + - tuya:device-control # + - topic-a # + #writes: + # - .* + + - id: 2 + username: user_dev + password: Lxy12345 + groups: + - zcore + +# 角色组 +groups: + - name: admin + description: Group 1 + reads: + - ^zcore:* # "zcore:" 开头的订阅 + writes: + - ^zcore:* # "zcore:" 开头的发送 + + - name: zcore + description: Group 2 + +# token信息 +tokens: + - id: 1 + user_id: 1 + token: token-12345 + status: active + expiration: 2024-07-20 23:59:59 + - id: 2 + user_id: 2 + token: token-12346 + status: active + expiration: 2024-07-20 23:59:59 + +# --------------------------------------------- + +# 公开频道设置 +channels: + - name: "-" + description: "无效占位符" + public: true + + - name: "lock" + description: "分布式锁通知频道" + public: true + + - name: "app_local" + description: "本地appname" + public: true +# --------------------------------------------- diff --git a/internal/auth/auth.go b/internal/auth/auth.go new file mode 100644 index 0000000..a3e7542 --- /dev/null +++ b/internal/auth/auth.go @@ -0,0 +1,225 @@ +package auth + +import ( + "fmt" + "gopkg.in/yaml.v3" + "os" + "regexp" + "strings" + "sync" + "time" +) + +type User struct { + ID int `yaml:"id"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Status string `yaml:"status"` + + Groups []string `yaml:"groups"` + Read []string `yaml:"reads"` + Write []string `yaml:"writes"` + + authCache map[string]string // zcore:userid = rw|w|r|- + lock sync.RWMutex +} +type Group struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + Read []string `yaml:"reads"` + Write []string `yaml:"writes"` +} +type Token struct { + ID int `yaml:"id"` + UserID int `yaml:"user_id"` + Token string `yaml:"token"` + Expiration time.Time `yaml:"expiration"` + Status string `yaml:"status"` + Channels []string `yaml:"channels"` +} + +type Channel struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + Public bool `yaml:"public"` +} + +type Config struct { + Users []*User `yaml:"users"` + Groups []Group `yaml:"groups"` + Tokens []Token `yaml:"tokens"` + Channels []Channel `yaml:"channels"` +} + +type PermissionManager struct { + config Config + userMap map[int]*User + groupMap map[string]Group + tokenMap map[string]Token + channelMap map[string]Channel + lock sync.RWMutex +} + +func (p *PermissionManager) Init() error { + p.lock.Lock() + defer p.lock.Unlock() + // Load YAML configuration from file + data, err := os.ReadFile("./auth.yml") + if err != nil { + return err + } + // Unmarshal YAML into AuthConfig struct + err = yaml.Unmarshal(data, &p.config) + + if err != nil { + return err + } + + // Build user map + p.userMap = make(map[int]*User) + for _, user := range p.config.Users { + p.userMap[user.ID] = user + } + + // Build group map + p.groupMap = make(map[string]Group) + for _, group := range p.config.Groups { + p.groupMap[group.Name] = group + } + + // Build token map + p.tokenMap = make(map[string]Token) + for _, token := range p.config.Tokens { + p.tokenMap[token.Token] = token + } + + // Build channel map + p.channelMap = make(map[string]Channel) + for _, channel := range p.config.Channels { + p.channelMap[channel.Name] = channel + } + + // clean cache + for _, user := range p.userMap { + user.authCache = make(map[string]string) + } + + //p.updatePermissions() + return nil +} + +func (p *PermissionManager) Reload() error { + // Reload the configuration by calling the Init() method again + err := p.Init() + if err != nil { + return err + } + return nil +} + +func (p *PermissionManager) GetUserIdByToken(token string) (int, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + t, found := p.tokenMap[token] + if !found || !t.isValid() { + return 0, fmt.Errorf("invalid or expired token") + } + + user, found := p.userMap[t.UserID] + if !found || user.Status != "active" { + return 0, fmt.Errorf("invalid or expired token") + } + + return user.ID, nil +} + +// AuthCheck cate = r|w +func (p *PermissionManager) AuthCheck(userid int, topic, cate string) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + user := p.userMap[userid] + + str := user.authCache[topic] + if str != "" && strings.Contains(str, cate) { + return true + } + c := p.channelMap[topic] + if c.Public { + return true + } + + // collect all auth expression + exps := make(map[string]bool) + if cate == "r" { + for _, exp := range user.Read { + exps[exp] = true + } + for _, g := range user.Groups { + group := p.groupMap[g] + for _, exp := range group.Read { + exps[exp] = true + } + } + } else if cate == "w" { + for _, exp := range user.Write { + exps[exp] = true + } + for _, g := range user.Groups { + group := p.groupMap[g] + for _, exp := range group.Write { + exps[exp] = true + } + } + } + + matchFound := false + for exp, _ := range exps { + matchFound, _ = regexp.MatchString(exp, topic) + if matchFound { + break + } + } + + if matchFound { + user.lock.Lock() + defer user.lock.Unlock() + + if user.authCache == nil { + user.authCache = make(map[string]string) + } + user.authCache[topic] = cate + str + return true + } + + return false +} + +func (p *PermissionManager) IsAdmin(userId int) bool { + user := p.userMap[userId] + return user != nil && user.Username == "admin" +} + +// AdminToken server's admin using +func (p *PermissionManager) AdminToken() (string, error) { + var userId int + for _, user := range p.userMap { + if user.Username == "admin" { + userId = user.ID + } + } + + for _, token := range p.tokenMap { + if token.UserID == userId && token.isValid() { + return token.Token, nil + } + } + + return "", fmt.Errorf("-Error: can't found valid admin token") +} + +// isValid check token's valid +func (t *Token) isValid() bool { + return t.Expiration.After(time.Now()) && t.Status == "active" +} diff --git a/internal/auth/auth_test.go b/internal/auth/auth_test.go new file mode 100644 index 0000000..f42eea6 --- /dev/null +++ b/internal/auth/auth_test.go @@ -0,0 +1,41 @@ +package auth + +import ( + "fmt" + "log" + "regexp" + "testing" +) + +func TestAuth(t *testing.T) { + // Create an instance of PermissionManager + p := &PermissionManager{} + // Initialize the permission manager + err := p.Init() + if err != nil { + log.Fatal(err) + } + // Example usage: Get permissions by token + token := "token-12345" + userId, err := p.GetUserIdByToken(token) + if err != nil { + log.Fatal(err) + } + fmt.Println("Permissions:") + + check := p.AuthCheck(userId, "zcore:abx", "r") + check2 := p.AuthCheck(userId, "zcore:abx", "r") + + fmt.Println("check:", check) + fmt.Println("check2:", check2) +} + +func TestName(t *testing.T) { + exp := "zcore:*" + channel := "zcore:axxx" + compile, _ := regexp.Compile(exp) + + //matched, _ := regexp.MatchString(exp, channel) + matched := compile.MatchString(channel) + fmt.Println("matched:", matched) +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index ae48cb2..cf9a503 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -28,7 +28,8 @@ func StartWatch() { zsub.Hub.Clearup() c.JSON(http.StatusOK, "+OK") }) - r.GET("/retimer", func(c *gin.Context) { + + r.GET("/timer/reload", func(c *gin.Context) { zsub.Hub.ReloadTimer() c.JSON(http.StatusOK, "+reload timer ok") }) @@ -48,6 +49,12 @@ func StartWatch() { c.JSON(http.StatusOK, "+OK") }) + // reload the auth configuration + r.GET("/auth/reload", func(c *gin.Context) { + zsub.AuthManager.Reload() + c.JSON(http.StatusOK, "+OK") + }) + watchAddr := zsub.Conf.Service.Watch r.Run(watchAddr) } diff --git a/internal/zsub/msg-accept.go b/internal/zsub/msg-accept.go index 0ce4c6b..e4e637f 100644 --- a/internal/zsub/msg-accept.go +++ b/internal/zsub/msg-accept.go @@ -6,8 +6,20 @@ import ( "strconv" "strings" "time" + "zhub/internal/auth" ) +var AuthManager *auth.PermissionManager + +func init() { + AuthManager = &auth.PermissionManager{} + // Initialize the permission manager + err := AuthManager.Init() + if err != nil { + log.Fatal(err) + } +} + var funChan = make(chan func(), 1000) func handleMessage(v Message) { @@ -31,32 +43,14 @@ func handleMessage(v Message) { if Conf.Log.Level == "debug" && rcmd[0] != "auth" { log.Printf("[%d] cmd: %s\n", v.Conn.sn, strings.Join(rcmd, " ")) - } else if rcmd[0] == "auth" { - if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 { - c.send("-Error: invalid password!") - return - } - - inx := strings.IndexAny(rcmd[1], "@") //user@pwd - - authKey := rcmd[1][:inx] //user - authValue := Conf.Auth[rcmd[1][:inx]] //pwd - if strings.EqualFold(authValue, rcmd[1][inx+1:]) { - c.auth = rcmd[1][:inx] - c.send("+Auth: ok!") - log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[OK]") - } else { - c.send("-Auth: invalid password!") - log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[Error]") - } - return } - if strings.TrimSpace(c.auth) == "" && rcmd[0] != "auth" && Conf.Service.Auth { + // 准入拦截,所有指令完成 auth 认证后才可进入 + if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" { c.send("-Auth: NOAUTH Authentication required:" + rcmd[0]) return } - + // 指令预处理 if len(rcmd) == 1 { switch strings.ToLower(rcmd[0]) { default: @@ -82,6 +76,31 @@ func handleMessage(v Message) { cmd := rcmd[0] switch cmd { + case "auth": + userid, err := AuthManager.GetUserIdByToken(rcmd[1]) + if err != nil { + c.send("-Error: " + err.Error()) + return + } + + c.user = userid + c.send("+Auth: ok!") + + // hide the auth token content + str := func() string { + str := rcmd[1] + length := len(str) + if length > 4 { + return str[:2] + strings.Repeat("*", length-4) + str[length-2:] + } else if length > 2 { + return str[:1] + strings.Repeat("*", length-2) + str[length-1:] + } else { + return strings.Repeat("*", length) + } + }() + + log.Printf("[%d] cmd: %s, auth [OK]\n", v.Conn.sn, str) + return case "groupid": c.groupid = rcmd[1] return @@ -113,9 +132,19 @@ func handleMessage(v Message) { /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd }*/ + + // auth check + if !AuthManager.AuthCheck(c.user, rcmd[1], "w") { + c.send("-Error: Insufficient permissions to send topic [" + rcmd[1] + "] message.") + return + } Hub.Publish(rcmd[1], rcmd[2]) } return + case "broadcast": + Hub.broadcast(rcmd[1], rcmd[2]) + case "delay": + Hub.Delay(rcmd) default: } @@ -130,16 +159,17 @@ func handleMessage(v Message) { case "subscribe": // subscribe x y z for _, topic := range rcmd[1:] { - c.subscribe(topic) // todo: 批量一次订阅 + // auth check + if !AuthManager.AuthCheck(c.user, rcmd[1], "r") { + c.send("-Error: Insufficient permissions to accept topic [" + topic + "] message.") + continue + } + c.subscribe(topic) } case "unsubscribe": for _, topic := range rcmd[1:] { c.unsubscribe(topic) } - case "broadcast": - Hub.broadcast(rcmd[1], rcmd[2]) - case "delay": - Hub.Delay(rcmd) case "timer": for _, name := range rcmd[1:] { Hub.timer([]string{"timer", name}, c) // append to timers @@ -153,7 +183,7 @@ func handleMessage(v Message) { case "reload-timer": Hub.ReloadTimer() case "shutdown": - if !strings.EqualFold(c.groupid, "group-admin") { + if AuthManager.IsAdmin(c.user) { return } Hub.shutdown() @@ -173,22 +203,6 @@ func handleMessage(v Message) { return } Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) - /*case "auth": - if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 { - c.send("-Error: invalid password!") - return - } - - inx := strings.IndexAny(rcmd[1], "@") //user@pwd - - authKey := Conf.Auth[rcmd[1][:inx]] - if strings.EqualFold(authKey, rcmd[1][inx+1:]) { - c.auth = rcmd[1][:inx] - c.send("+Auth: ok!") - } else { - c.send("-Auth: invalid password!") - } - return*/ default: c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") return diff --git a/internal/zsub/zsub.go b/internal/zsub/zsub.go index f550acb..8f9c03a 100644 --- a/internal/zsub/zsub.go +++ b/internal/zsub/zsub.go @@ -89,7 +89,7 @@ type ZConn struct { //ZConn substoped map[string]chan int // 关闭信号量 ping int64 // 最后心跳时间 pong int64 // 最后心跳回复时间 - auth string // 是否已验证授权 + user int // 是否已验证授权 } type Lock struct { @@ -486,7 +486,7 @@ func Info() map[string]interface{} { m["groupid"] = c.groupid m["topics"] = c.topics m["timers"] = c.timers - m["auth"] = c.auth + m["user"] = c.user conns = append(conns, m) } diff --git a/main.go b/main.go index 6e41be7..e78abed 100644 --- a/main.go +++ b/main.go @@ -30,12 +30,13 @@ func main() { } if rcmd != "" { // 如果指定了客户端命令 - auth := "" // 认证信息 - for key, value := range conf.Auth { // 遍历找到一个认证信息 - auth = key + "@" + value - break + adminToken, err := zsub.AuthManager.AdminToken() // 认证信息 + if err != nil { + log.Fatal(err) // Configuration error, stop the client from running. + return } - cli, err := cmd.Create("zhub-local", addr, "group-admin", auth) // 创建客户端连接 + + cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接 if err != nil { log.Println(err) // 如果连接失败则打印错误信息 return