新增:频道(topic)细粒度权限控制,是否允许订阅消息、是否允许发送消息到频道

This commit is contained in:
2023-07-16 01:33:25 +08:00
parent 66772cb659
commit bf82800afc
8 changed files with 412 additions and 55 deletions

View File

@@ -16,7 +16,3 @@ db.addr=127.0.0.1:3306
db.user=root db.user=root
db.password=123456 db.password=123456
db.database=zhub db.database=zhub
[auth]
admin=123456
lxy=123456

73
auth.yml Normal file
View File

@@ -0,0 +1,73 @@
# 下面是一个示例的 YAML 数据格式用于存储用户、用户组、Token、频道和授权信息
#
# users存储用户信息包括用户ID、用户名、密码和所属用户组。
# groups存储用户组信息包括用户组名称和描述。
# tokens存储Token信息包括Token ID、用户ID、Token值和可访问的频道。
# channels存储频道信息包括频道名称、描述和是否为公开频道。
# -------------------------------------------------------------------
# 下列示例中使用了上面的数据结构:
# 用户信息
users:
- id: 1
username: admin
password: Admin12345
status: active
groups:
- admin
# 正则 或 channel name
reads:
- wx:user-follow #
- tuya:device-control #
- topic-a #
#writes:
# - .*
- id: 2
username: user_dev
password: Lxy12345
groups:
- zcore
# 角色组
groups:
- name: admin
description: Group 1
reads:
- ^zcore:* # "zcore:" 开头的订阅
writes:
- ^zcore:* # "zcore:" 开头的发送
- name: zcore
description: Group 2
# token信息
tokens:
- id: 1
user_id: 1
token: token-12345
status: active
expiration: 2024-07-20 23:59:59
- id: 2
user_id: 2
token: token-12346
status: active
expiration: 2024-07-20 23:59:59
# ---------------------------------------------
# 公开频道设置
channels:
- name: "-"
description: "无效占位符"
public: true
- name: "lock"
description: "分布式锁通知频道"
public: true
- name: "app_local"
description: "本地appname"
public: true
# ---------------------------------------------

225
internal/auth/auth.go Normal file
View File

@@ -0,0 +1,225 @@
package auth
import (
"fmt"
"gopkg.in/yaml.v3"
"os"
"regexp"
"strings"
"sync"
"time"
)
type User struct {
ID int `yaml:"id"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Status string `yaml:"status"`
Groups []string `yaml:"groups"`
Read []string `yaml:"reads"`
Write []string `yaml:"writes"`
authCache map[string]string // zcore:userid = rw|w|r|-
lock sync.RWMutex
}
type Group struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
Read []string `yaml:"reads"`
Write []string `yaml:"writes"`
}
type Token struct {
ID int `yaml:"id"`
UserID int `yaml:"user_id"`
Token string `yaml:"token"`
Expiration time.Time `yaml:"expiration"`
Status string `yaml:"status"`
Channels []string `yaml:"channels"`
}
type Channel struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
Public bool `yaml:"public"`
}
type Config struct {
Users []*User `yaml:"users"`
Groups []Group `yaml:"groups"`
Tokens []Token `yaml:"tokens"`
Channels []Channel `yaml:"channels"`
}
type PermissionManager struct {
config Config
userMap map[int]*User
groupMap map[string]Group
tokenMap map[string]Token
channelMap map[string]Channel
lock sync.RWMutex
}
func (p *PermissionManager) Init() error {
p.lock.Lock()
defer p.lock.Unlock()
// Load YAML configuration from file
data, err := os.ReadFile("./auth.yml")
if err != nil {
return err
}
// Unmarshal YAML into AuthConfig struct
err = yaml.Unmarshal(data, &p.config)
if err != nil {
return err
}
// Build user map
p.userMap = make(map[int]*User)
for _, user := range p.config.Users {
p.userMap[user.ID] = user
}
// Build group map
p.groupMap = make(map[string]Group)
for _, group := range p.config.Groups {
p.groupMap[group.Name] = group
}
// Build token map
p.tokenMap = make(map[string]Token)
for _, token := range p.config.Tokens {
p.tokenMap[token.Token] = token
}
// Build channel map
p.channelMap = make(map[string]Channel)
for _, channel := range p.config.Channels {
p.channelMap[channel.Name] = channel
}
// clean cache
for _, user := range p.userMap {
user.authCache = make(map[string]string)
}
//p.updatePermissions()
return nil
}
func (p *PermissionManager) Reload() error {
// Reload the configuration by calling the Init() method again
err := p.Init()
if err != nil {
return err
}
return nil
}
func (p *PermissionManager) GetUserIdByToken(token string) (int, error) {
p.lock.RLock()
defer p.lock.RUnlock()
t, found := p.tokenMap[token]
if !found || !t.isValid() {
return 0, fmt.Errorf("invalid or expired token")
}
user, found := p.userMap[t.UserID]
if !found || user.Status != "active" {
return 0, fmt.Errorf("invalid or expired token")
}
return user.ID, nil
}
// AuthCheck cate = r|w
func (p *PermissionManager) AuthCheck(userid int, topic, cate string) bool {
p.lock.RLock()
defer p.lock.RUnlock()
user := p.userMap[userid]
str := user.authCache[topic]
if str != "" && strings.Contains(str, cate) {
return true
}
c := p.channelMap[topic]
if c.Public {
return true
}
// collect all auth expression
exps := make(map[string]bool)
if cate == "r" {
for _, exp := range user.Read {
exps[exp] = true
}
for _, g := range user.Groups {
group := p.groupMap[g]
for _, exp := range group.Read {
exps[exp] = true
}
}
} else if cate == "w" {
for _, exp := range user.Write {
exps[exp] = true
}
for _, g := range user.Groups {
group := p.groupMap[g]
for _, exp := range group.Write {
exps[exp] = true
}
}
}
matchFound := false
for exp, _ := range exps {
matchFound, _ = regexp.MatchString(exp, topic)
if matchFound {
break
}
}
if matchFound {
user.lock.Lock()
defer user.lock.Unlock()
if user.authCache == nil {
user.authCache = make(map[string]string)
}
user.authCache[topic] = cate + str
return true
}
return false
}
func (p *PermissionManager) IsAdmin(userId int) bool {
user := p.userMap[userId]
return user != nil && user.Username == "admin"
}
// AdminToken server's admin using
func (p *PermissionManager) AdminToken() (string, error) {
var userId int
for _, user := range p.userMap {
if user.Username == "admin" {
userId = user.ID
}
}
for _, token := range p.tokenMap {
if token.UserID == userId && token.isValid() {
return token.Token, nil
}
}
return "", fmt.Errorf("-Error: can't found valid admin token")
}
// isValid check token's valid
func (t *Token) isValid() bool {
return t.Expiration.After(time.Now()) && t.Status == "active"
}

View File

@@ -0,0 +1,41 @@
package auth
import (
"fmt"
"log"
"regexp"
"testing"
)
func TestAuth(t *testing.T) {
// Create an instance of PermissionManager
p := &PermissionManager{}
// Initialize the permission manager
err := p.Init()
if err != nil {
log.Fatal(err)
}
// Example usage: Get permissions by token
token := "token-12345"
userId, err := p.GetUserIdByToken(token)
if err != nil {
log.Fatal(err)
}
fmt.Println("Permissions:")
check := p.AuthCheck(userId, "zcore:abx", "r")
check2 := p.AuthCheck(userId, "zcore:abx", "r")
fmt.Println("check:", check)
fmt.Println("check2:", check2)
}
func TestName(t *testing.T) {
exp := "zcore:*"
channel := "zcore:axxx"
compile, _ := regexp.Compile(exp)
//matched, _ := regexp.MatchString(exp, channel)
matched := compile.MatchString(channel)
fmt.Println("matched:", matched)
}

View File

@@ -28,7 +28,8 @@ func StartWatch() {
zsub.Hub.Clearup() zsub.Hub.Clearup()
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
r.GET("/retimer", func(c *gin.Context) {
r.GET("/timer/reload", func(c *gin.Context) {
zsub.Hub.ReloadTimer() zsub.Hub.ReloadTimer()
c.JSON(http.StatusOK, "+reload timer ok") c.JSON(http.StatusOK, "+reload timer ok")
}) })
@@ -48,6 +49,12 @@ func StartWatch() {
c.JSON(http.StatusOK, "+OK") c.JSON(http.StatusOK, "+OK")
}) })
// reload the auth configuration
r.GET("/auth/reload", func(c *gin.Context) {
zsub.AuthManager.Reload()
c.JSON(http.StatusOK, "+OK")
})
watchAddr := zsub.Conf.Service.Watch watchAddr := zsub.Conf.Service.Watch
r.Run(watchAddr) r.Run(watchAddr)
} }

View File

@@ -6,8 +6,20 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"zhub/internal/auth"
) )
var AuthManager *auth.PermissionManager
func init() {
AuthManager = &auth.PermissionManager{}
// Initialize the permission manager
err := AuthManager.Init()
if err != nil {
log.Fatal(err)
}
}
var funChan = make(chan func(), 1000) var funChan = make(chan func(), 1000)
func handleMessage(v Message) { func handleMessage(v Message) {
@@ -31,32 +43,14 @@ func handleMessage(v Message) {
if Conf.Log.Level == "debug" && rcmd[0] != "auth" { if Conf.Log.Level == "debug" && rcmd[0] != "auth" {
log.Printf("[%d] cmd: %s\n", v.Conn.sn, strings.Join(rcmd, " ")) log.Printf("[%d] cmd: %s\n", v.Conn.sn, strings.Join(rcmd, " "))
} else if rcmd[0] == "auth" {
if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 {
c.send("-Error: invalid password!")
return
}
inx := strings.IndexAny(rcmd[1], "@") //user@pwd
authKey := rcmd[1][:inx] //user
authValue := Conf.Auth[rcmd[1][:inx]] //pwd
if strings.EqualFold(authValue, rcmd[1][inx+1:]) {
c.auth = rcmd[1][:inx]
c.send("+Auth: ok!")
log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[OK]")
} else {
c.send("-Auth: invalid password!")
log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[Error]")
}
return
} }
if strings.TrimSpace(c.auth) == "" && rcmd[0] != "auth" && Conf.Service.Auth { // 准入拦截,所有指令完成 auth 认证后才可进入
if c.user == 0 && Conf.Service.Auth && rcmd[0] != "auth" {
c.send("-Auth: NOAUTH Authentication required:" + rcmd[0]) c.send("-Auth: NOAUTH Authentication required:" + rcmd[0])
return return
} }
// 指令预处理
if len(rcmd) == 1 { if len(rcmd) == 1 {
switch strings.ToLower(rcmd[0]) { switch strings.ToLower(rcmd[0]) {
default: default:
@@ -82,6 +76,31 @@ func handleMessage(v Message) {
cmd := rcmd[0] cmd := rcmd[0]
switch cmd { switch cmd {
case "auth":
userid, err := AuthManager.GetUserIdByToken(rcmd[1])
if err != nil {
c.send("-Error: " + err.Error())
return
}
c.user = userid
c.send("+Auth: ok!")
// hide the auth token content
str := func() string {
str := rcmd[1]
length := len(str)
if length > 4 {
return str[:2] + strings.Repeat("*", length-4) + str[length-2:]
} else if length > 2 {
return str[:1] + strings.Repeat("*", length-2) + str[length-1:]
} else {
return strings.Repeat("*", length)
}
}()
log.Printf("[%d] cmd: %s, auth [OK]\n", v.Conn.sn, str)
return
case "groupid": case "groupid":
c.groupid = rcmd[1] c.groupid = rcmd[1]
return return
@@ -113,9 +132,19 @@ func handleMessage(v Message) {
/*if len(topicChan) < cap(topicChan) { /*if len(topicChan) < cap(topicChan) {
topicChan <- rcmd topicChan <- rcmd
}*/ }*/
// auth check
if !AuthManager.AuthCheck(c.user, rcmd[1], "w") {
c.send("-Error: Insufficient permissions to send topic [" + rcmd[1] + "] message.")
return
}
Hub.Publish(rcmd[1], rcmd[2]) Hub.Publish(rcmd[1], rcmd[2])
} }
return return
case "broadcast":
Hub.broadcast(rcmd[1], rcmd[2])
case "delay":
Hub.Delay(rcmd)
default: default:
} }
@@ -130,16 +159,17 @@ func handleMessage(v Message) {
case "subscribe": case "subscribe":
// subscribe x y z // subscribe x y z
for _, topic := range rcmd[1:] { for _, topic := range rcmd[1:] {
c.subscribe(topic) // todo: 批量一次订阅 // auth check
if !AuthManager.AuthCheck(c.user, rcmd[1], "r") {
c.send("-Error: Insufficient permissions to accept topic [" + topic + "] message.")
continue
}
c.subscribe(topic)
} }
case "unsubscribe": case "unsubscribe":
for _, topic := range rcmd[1:] { for _, topic := range rcmd[1:] {
c.unsubscribe(topic) c.unsubscribe(topic)
} }
case "broadcast":
Hub.broadcast(rcmd[1], rcmd[2])
case "delay":
Hub.Delay(rcmd)
case "timer": case "timer":
for _, name := range rcmd[1:] { for _, name := range rcmd[1:] {
Hub.timer([]string{"timer", name}, c) // append to timers Hub.timer([]string{"timer", name}, c) // append to timers
@@ -153,7 +183,7 @@ func handleMessage(v Message) {
case "reload-timer": case "reload-timer":
Hub.ReloadTimer() Hub.ReloadTimer()
case "shutdown": case "shutdown":
if !strings.EqualFold(c.groupid, "group-admin") { if AuthManager.IsAdmin(c.user) {
return return
} }
Hub.shutdown() Hub.shutdown()
@@ -173,22 +203,6 @@ func handleMessage(v Message) {
return return
} }
Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]})
/*case "auth":
if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 {
c.send("-Error: invalid password!")
return
}
inx := strings.IndexAny(rcmd[1], "@") //user@pwd
authKey := Conf.Auth[rcmd[1][:inx]]
if strings.EqualFold(authKey, rcmd[1][inx+1:]) {
c.auth = rcmd[1][:inx]
c.send("+Auth: ok!")
} else {
c.send("-Auth: invalid password!")
}
return*/
default: default:
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
return return

View File

@@ -89,7 +89,7 @@ type ZConn struct { //ZConn
substoped map[string]chan int // 关闭信号量 substoped map[string]chan int // 关闭信号量
ping int64 // 最后心跳时间 ping int64 // 最后心跳时间
pong int64 // 最后心跳回复时间 pong int64 // 最后心跳回复时间
auth string // 是否已验证授权 user int // 是否已验证授权
} }
type Lock struct { type Lock struct {
@@ -486,7 +486,7 @@ func Info() map[string]interface{} {
m["groupid"] = c.groupid m["groupid"] = c.groupid
m["topics"] = c.topics m["topics"] = c.topics
m["timers"] = c.timers m["timers"] = c.timers
m["auth"] = c.auth m["user"] = c.user
conns = append(conns, m) conns = append(conns, m)
} }

11
main.go
View File

@@ -30,12 +30,13 @@ func main() {
} }
if rcmd != "" { // 如果指定了客户端命令 if rcmd != "" { // 如果指定了客户端命令
auth := "" // 认证信息 adminToken, err := zsub.AuthManager.AdminToken() // 认证信息
for key, value := range conf.Auth { // 遍历找到一个认证信息 if err != nil {
auth = key + "@" + value log.Fatal(err) // Configuration error, stop the client from running.
break return
} }
cli, err := cmd.Create("zhub-local", addr, "group-admin", auth) // 创建客户端连接
cli, err := cmd.Create("server-local", addr, "server-admin", adminToken) // 创建客户端连接
if err != nil { if err != nil {
log.Println(err) // 如果连接失败则打印错误信息 log.Println(err) // 如果连接失败则打印错误信息
return return