From b38fdfa05832e4efd680913b508c20fa26268e23 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 3 May 2021 14:35:44 +0000 Subject: [PATCH] =?UTF-8?q?=E6=B7=87=EE=86=BD=E6=95=BC=E9=94=9B=3F.=20?= =?UTF-8?q?=E9=8F=82=E6=9D=BF=EE=96=83=E6=B5=9C=E5=97=9B=E6=94=A3=E9=8F=88?= =?UTF-8?q?=E5=93=84=E5=9F=97=E6=B5=A0=E3=83=A9=E6=A7=BB=E5=A7=9D=E3=88=A0?= =?UTF-8?q?=E8=8B=9F=E9=8D=99=E6=88=A3=EE=86=96=E9=97=82=EE=86=BC=E5=95=BF?= =?UTF-8?q?=E7=BB=90=E4=BE=8A=E7=B4=B12.=20=E6=B7=87=EE=86=BD=E6=95=BC?= =?UTF-8?q?=E6=B5=9C=E5=97=95=E7=AB=B4=E6=B5=9C=E6=B6=98=E5=BD=89=E9=96=B2?= =?UTF-8?q?=E5=BF=93=E6=82=95=E6=B5=A0=E3=83=A6=E5=BD=81=E6=A5=82=E6=A8=B9?= =?UTF-8?q?=E5=94=AC=E9=90=AE=E4=BD=B8=E5=BD=B2=E7=92=87=E7=BB=98=E2=82=AC?= =?UTF-8?q?=D1=8D=E7=B4=B1=203.=20=E6=B7=87=EE=86=BC=EE=98=B2=E6=B5=9C?= =?UTF-8?q?=E5=97=95=E7=AB=B4=E6=B6=93=EE=81=84=E5=BD=B2=E9=91=B3=E8=97=89?= =?UTF-8?q?=EE=87=B1=E9=91=B7=E5=AF=B8=E7=B4=A6=E9=8D=90=E6=8F=92=E5=B0=AF?= =?UTF-8?q?=E5=A9=A7=E3=88=A0=E5=9A=AD=E9=90=A8=3Fbug=E9=94=9B=E5=B1=BD?= =?UTF-8?q?=E7=9A=A2=E7=BC=82=E6=92=B3=E5=95=BF=E9=8D=96=E5=93=84=E3=81=87?= =?UTF-8?q?=E7=81=8F=E5=BF=9A=EE=86=95=E7=BC=83=EE=86=BB=E8=B4=9F=204096?= =?UTF-8?q?=E9=94=9B=3F.=20=E7=80=B5=E9=80=9B=E5=94=AC=E9=90=AE=E4=BD=BD?= =?UTF-8?q?=E7=B9=98=E7=90=9B=E5=B1=BC=E7=B0=A1=E6=B6=93=E2=82=AC=E6=B5=9C?= =?UTF-8?q?=E6=B6=98=E5=8F=BE=E6=B5=A0=E6=A0=AB=E6=AE=91=E7=81=8F=E5=BF=94?= =?UTF-8?q?=E6=95=BC=E6=9D=A9=E6=B6=98=E6=8B=B0=E6=B5=BC=E6=A8=BA=E5=AF=B2?= =?UTF-8?q?=E9=8A=86=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@121 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- app.go | 2 + cli/client.go | 67 ++++++++++++++++++++-- cli_test.go | 32 ++++++++++- go.mod | 1 + monitor/monitor.go | 48 ++++++++++++++++ pkg.bat | 8 +-- public/index.html | 10 ++++ zsub/msg-consumer.go | 18 +++++- zsub/zdb.go | 59 ++++++++++++++++++- zsub/zsub.go | 132 +++++++++++++++++++++++++++++++++++++++---- zsub/ztimer.go | 6 +- zsub/ztopic.go | 2 +- 12 files changed, 356 insertions(+), 29 deletions(-) create mode 100644 monitor/monitor.go create mode 100644 public/index.html diff --git a/app.go b/app.go index d9dbeda..22db86b 100644 --- a/app.go +++ b/app.go @@ -7,6 +7,7 @@ import ( "time" "zhub/cli" "zhub/conf" + "zhub/monitor" "zhub/zsub" ) @@ -46,6 +47,7 @@ func main() { } if server { + go monitor.StartHttp() zsub.ServerStart(addr) // 服务进程启动 } else { cli.ClientRun(addr) diff --git a/cli/client.go b/cli/client.go index 38f0302..91855b4 100644 --- a/cli/client.go +++ b/cli/client.go @@ -3,6 +3,7 @@ package cli import ( "bufio" "fmt" + "github.com/go-basic/uuid" "log" "net" "os" @@ -24,9 +25,18 @@ type Client struct { subFun map[string]func(v string) // subscribe topic and callback function timerFun map[string]func() // subscribe timer amd callback function - chSend chan []string // chan of send message - chReceive chan []string // chan of receive message - timerReceive chan []string // chan of timer + chSend chan []string // chan of send message + chReceive chan []string // chan of receive message + timerReceive chan []string // chan of timer + lockFlag map[string]*Lock // chan of lock +} + +type Lock struct { + Key string // lock Key + Uuid string // lock Uuid + flagChan chan int // + // starttime uint32 // lock start time + // duration int // lock duration } func Create(addr string, groupid string) (*Client, error) { @@ -48,6 +58,7 @@ func Create(addr string, groupid string) (*Client, error) { chSend: make(chan []string, 100), chReceive: make(chan []string, 100), timerReceive: make(chan []string, 100), + lockFlag: make(map[string]*Lock), } client.send("groupid " + groupid) @@ -115,6 +126,8 @@ subscribe x y z func (c *Client) Subscribe(topic string, fun func(v string)) { c.send("subscribe " + topic) if fun != nil { + c.wlock.Lock() + defer c.wlock.Unlock() c.subFun[topic] = fun } } @@ -133,7 +146,7 @@ func (c *Client) ping() { c.send("ping") } -// -------------------------------------- pub-sub -------------------------------------- +//Publish -------------------------------------- pub-sub -------------------------------------- /* send topic message : --- @@ -158,7 +171,9 @@ func (c *Client) Delay(topic string, message string, delay int) error { return c.send("delay", topic, message, strconv.Itoa(delay)) } -/*func (c *Client) Timer(topic string, expr string, fun func()) { +/* +Timer +func (c *Client) Timer(topic string, expr string, fun func()) { c.timerFun[topic] = fun c.send("timer", topic, expr, "x") }*/ @@ -185,6 +200,35 @@ func (c *Client) Close() { c.conn.Close() } +// Lock Key +func (c *Client) Lock(key string, duration int) Lock { + uuid := uuid.New() + c.send("lock", key, uuid, strconv.Itoa(duration)) + + lockChan := make(chan int, 2) + go func() { + c.wlock.Lock() + defer c.wlock.Unlock() + c.lockFlag[uuid] = &Lock{ + Key: key, + Uuid: uuid, + flagChan: lockChan, + } + }() + + select { + case <-lockChan: + log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid) + } + + return Lock{Key: key, Uuid: uuid} +} + +func (c *Client) Unlock(l Lock) { + c.send("unlock", l.Key, l.Uuid) + delete(c.lockFlag, l.Uuid) +} + /*func (c *Client) subscribes(topics ...string) error { if len(topics) == 0 { return nil @@ -269,6 +313,19 @@ func (c *Client) receive() { } if len(vs) == 3 && strings.EqualFold(vs[0], "message") { + if strings.EqualFold(vs[1], "lock") { // message lock Uuid + go func() { + log.Println("lock:" + vs[2]) + c.wlock.Lock() + defer c.wlock.Unlock() + + if c.lockFlag[vs[2]] == nil { + return + } + c.lockFlag[vs[2]].flagChan <- 0 + }() + continue + } c.chReceive <- vs continue } diff --git a/cli_test.go b/cli_test.go index f808d37..359173d 100644 --- a/cli_test.go +++ b/cli_test.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "strconv" "testing" @@ -108,12 +109,13 @@ func TestTimer(t *testing.T) { } func TestSendCmd(t *testing.T) { - client, err := cli.Create(addr, "") + client, err := cli.Create(addr, "group-admin") if err != nil { log.Println(err) } - client.Cmd("reload-timer") + //client.Cmd("reload-timer") + client.Cmd("shutdown") } func TestPublish(t *testing.T) { @@ -127,3 +129,29 @@ func TestPublish(t *testing.T) { time.Sleep(time.Second) } + +func TestLock(t *testing.T) { + client, _ := cli.Create(addr, "xx") + + client.Subscribe("lock", func(v string) { + + }) + + var fun = func(x string) { + log.Println("lock", time.Now().UnixNano()/1e6) + lock := client.Lock("a", 30) + defer client.Unlock(lock) + //client.Lock("a", 5) + + for i := 0; i < 5; i++ { + time.Sleep(time.Second * 1) + fmt.Println(x + ":" + strconv.Itoa(i+1)) + } + } + + go fun("x") + go fun("y") + go fun("z") + + time.Sleep(time.Second * 30 * 10) +} diff --git a/go.mod b/go.mod index 436540b..a7d7b8e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module zhub go 1.16 require ( + github.com/go-basic/uuid v1.0.0 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/robfig/cron v1.2.0 ) diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 0000000..ecb5d4d --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,48 @@ +package monitor + +import ( + "encoding/json" + "net/http" + "os" + "path" + "zhub/zsub" +) + +func StartHttp() { + dir, _ := os.Getwd() + webDir := path.Join(dir, "/public") + + http.Handle("/", http.FileServer(http.Dir(webDir))) + http.HandleFunc("/info", info) + http.HandleFunc("/cleanup", cleanup) + http.HandleFunc("/retimer", retimer) + + http.ListenAndServe(":1217", nil) +} + +func retimer(w http.ResponseWriter, r *http.Request) { + zsub.ZSubx().ReloadTimer() + renderJson(w, "+reload timer ok") +} + +func cleanup(w http.ResponseWriter, r *http.Request) { + zsub.ZSubx().Clearup() + renderJson(w, "+OK") +} + +func info(w http.ResponseWriter, r *http.Request) { + topics := zsub.Info() + renderJson(w, topics) +} + +func renderJson(w http.ResponseWriter, d interface{}) { + var bytes []byte + + if str, ok := d.(string); ok { + bytes = []byte(str) + } else { + bytes, _ = json.Marshal(d) + w.Header().Set("content-type", "application/json; charset=utf-8;") + } + w.Write(bytes) +} diff --git a/pkg.bat b/pkg.bat index 6d4882c..f23800d 100644 --- a/pkg.bat +++ b/pkg.bat @@ -3,8 +3,8 @@ SET GOARCH=amd64 go build -o zhub.sh -ldflags "-s -w" ./app.go upx -9 zhub.sh -scp zhub.sh xhost:/opt/zhub -scp zhub.sh zhost:/opt/zhub -scp zhub.sh qhost:/opt/zhub -scp zhub.sh nhost:/opt/zhub +scp zhub.sh pro:/opt/zhub +scp zhub.sh dev:/opt/zhub +scp zhub.sh qc:/opt/zhub +scp zhub.sh my:/opt/zhub del zhub.sh diff --git a/public/index.html b/public/index.html new file mode 100644 index 0000000..3978828 --- /dev/null +++ b/public/index.html @@ -0,0 +1,10 @@ + + + + + Title + + +

welcome zhub!

+ + \ No newline at end of file diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 2dcdc87..91ac3d7 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -2,6 +2,7 @@ package zsub import ( "log" + "strconv" "strings" "zhub/conf" ) @@ -94,13 +95,28 @@ func msgAccept(v Message) { } switch rcmd[1] { case "reload-timer": - zsub.reloadTimer() + zsub.ReloadTimer() case "shutdown": if !strings.EqualFold(c.groupid, "group-admin") { return } zsub.shutdown() } + case "lock": + // lock key uuid 5 + if len(rcmd) != 4 { + c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]") + return + } + d, _ := strconv.Atoi(rcmd[3]) + zsub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d}) + case "unlock": + // unlock key uuid + if len(rcmd) != 3 { + c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]") + return + } + zsub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]}) default: c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]") return diff --git a/zsub/zdb.go b/zsub/zdb.go index 2d72946..2e43c59 100644 --- a/zsub/zdb.go +++ b/zsub/zdb.go @@ -36,9 +36,10 @@ func Append(str string, fileName string) { } // 数据持久化 -func (s *ZSub) saveDelay() { +func (s *ZSub) dataStorage() { s.Lock() defer s.Unlock() + // delay save err := os.Remove(conf.DataDir + "/delay.z") if err != nil { log.Println(err) @@ -49,9 +50,23 @@ func (s *ZSub) saveDelay() { str += fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10)) } Append(str, conf.DataDir+"/delay.z") + + // lock save + err = os.Remove(conf.DataDir + "/lock.z") + if err != nil { + log.Println(err) + } + str = "" + for _, locks := range s.locks { + for _, lock := range locks { + str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start) + break // 只记录获得锁的记录 + } + } + Append(str, conf.DataDir+"/lock.z") } -func (s *ZSub) reloadDelay() { +func (s *ZSub) loadDelay() { f, err := os.Open(conf.DataDir + "/delay.z") if err != nil { return @@ -85,3 +100,43 @@ func (s *ZSub) reloadDelay() { s.delay([]string{"delay", split[0], split[1], strconv.FormatInt((exectime-time.Now().Unix())*1000, 10)}, nil) } } + +func (s *ZSub) loadLock() { + f, err := os.Open(conf.DataDir + "/lock.z") + if err != nil { + return + } + defer f.Close() + + r := bufio.NewReader(f) + for { + bytes, err := r.ReadBytes('\n') + if err != nil { + return + } + line := string(bytes) + if len(line) == 0 { + continue + } + line = strings.Trim(line, " \r\n") + split := strings.Split(line, " ") + if len(split) != 4 { + continue + } + duration, err := strconv.Atoi(split[2]) + start, err := strconv.ParseInt(split[3], 10, 64) + + if start > 0 && time.Now().Unix()-start > 1 { + duration = int(time.Now().Unix() - start) + } else { + duration = 1 + } + + s._lock(&Lock{ + key: split[0], + uuid: split[1], + duration: duration, + // start: start, + }) + } +} diff --git a/zsub/zsub.go b/zsub/zsub.go index ef9362b..9f7b400 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "time" "zhub/conf" ) @@ -16,6 +17,7 @@ var ( topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), delays: make(map[string]*ZDelay), + locks: make(map[string][]*Lock), } ) @@ -24,6 +26,7 @@ type ZSub struct { topics map[string]*ZTopic timers map[string]*ZTimer delays map[string]*ZDelay + locks map[string][]*Lock } type ZConn struct { //ZConn @@ -36,6 +39,15 @@ type ZConn struct { //ZConn substoped map[string]chan int // 关闭信号量 } +type Lock struct { + key string + uuid string + duration int + timer *time.Timer + start int64 + //stop time.Time +} + func NewZConn(conn *net.Conn) *ZConn { return &ZConn{ conn: conn, @@ -60,7 +72,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} ztopic = &ZTopic{ groups: map[string]*ZGroup{}, topic: topic, - chMsg: make(chan string, 10000), + chMsg: make(chan string, 500), } ztopic.init() zsub.topics[topic] = ztopic @@ -71,7 +83,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{} zgroup = &ZGroup{ //conns: []*ZConn{}, ztopic: ztopic, - chMsg: make(chan string, 1000), + chMsg: make(chan string, 500), } ztopic.groups[c.groupid] = zgroup } @@ -187,8 +199,9 @@ func ServerStart(addr string) { }() // 重新加载[定时、延时] - go zsub.reloadTimer() - go zsub.reloadDelay() + go zsub.ReloadTimer() + go zsub.loadDelay() + //go zsub.loadLock() // 启动服务监听 listen, err := net.Listen("tcp", addr) @@ -237,8 +250,15 @@ func (s *ZSub) acceptHandler(c *ZConn) { n, _ := strconv.Atoi(string(line[1:])) for i := 0; i < n; i++ { reader.ReadLine() - v, _, _ := reader.ReadLine() - rcmd = append(rcmd, string(v)) + var vx = "" + a: + if v, prefix, _ := reader.ReadLine(); prefix { + vx += string(v) + goto a + } else { + vx += string(v) + } + rcmd = append(rcmd, vx) } default: rcmd = append(rcmd, string(line)) @@ -253,9 +273,9 @@ func (s *ZSub) acceptHandler(c *ZConn) { } /* -accept topic message -1、send message to topic's chan -2、feedback send success to sender, and sending message to topic's subscripts +accept stop message +1、send message to stop's chan +2、feedback send success to sender, and sending message to stop's subscripts */ func (s *ZSub) publish(topic, msg string) { s.RLock() @@ -274,6 +294,9 @@ send broadcast message func (s *ZSub) broadcast(topic, msg string) { s.RLock() defer s.RUnlock() + if strings.EqualFold(topic, "lock") { + log.Println("lock", msg) + } ztopic := s.topics[topic] //ZTopic if ztopic == nil { @@ -287,8 +310,95 @@ func (s *ZSub) broadcast(topic, msg string) { } } +/* +lock: lock key uuid t +unlock: unlock key uuid +*/ +func (s *ZSub) _lock(lock *Lock) { + locks := s.locks[lock.key] + if locks == nil { + locks = make([]*Lock, 0) + } + if len(locks) == 0 { // lock success + lock.start = time.Now().Unix() + locks = append(locks, lock) + s.locks[lock.key] = locks + s.broadcast("lock", lock.uuid) + + // 设置时间到解锁 + locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second) + go func() { + select { + case <-locks[0].timer.C: + s._unlock(*locks[0]) + } + }() + } else { + s.locks[lock.key] = append(locks, lock) + } +} +func (s *ZSub) _unlock(l Lock) { + locks := s.locks[l.key] + if locks == nil || len(locks) == 0 { + return + } + if strings.EqualFold(locks[0].uuid, l.uuid) { + locks[0].timer.Stop() + locks = locks[1:] + s.locks[l.key] = locks + } + if len(s.locks[l.key]) > 0 { // next lock + s.broadcast("lock", s.locks[l.key][0].uuid) + s.locks[l.key][0].start = time.Now().Unix() + s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second) + go func() { + select { + case <-s.locks[l.key][0].timer.C: + s._unlock(*s.locks[l.key][0]) + } + }() + } +} + func (s *ZSub) shutdown() { - s.saveDelay() - s.Lock() + s.dataStorage() os.Exit(0) } + +func Info() map[string]interface{} { + m := map[string]interface{}{} + + for s, topic := range zsub.topics { + // {groups:[{name:xxx,size:xx}]} + arr := make([]map[string]interface{}, 0) + + for groupname, group := range topic.groups { + arr = append(arr, map[string]interface{}{ + "name": groupname, + "subsize": len(group.conns), + "offset": group.offset, + "mcount": topic.mcount, + }) + } + m[s] = arr + } + + return m +} + +func (s *ZSub) Clearup() { + for tn, topic := range s.topics { + for _, group := range topic.groups { + if len(group.conns) > 0 || topic.mcount > group.offset { + goto a + } + } + close(topic.chMsg) + delete(s.topics, tn) + a: + } +} + +func ZSubx() *ZSub { + return zsub +} diff --git a/zsub/ztimer.go b/zsub/ztimer.go index d482dd4..8a6cf6b 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -165,7 +165,7 @@ func executeShell(command string) (string, error, string) { return stdout.String(), err, stderr.String() } -func (s *ZSub) reloadTimer() { +func (s *ZSub) ReloadTimer() { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", conf.GetStr("ztimer.db.user", "root"), conf.GetStr("ztimer.db.pwd", "123456"), @@ -207,7 +207,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { s.Lock() defer func() { s.Unlock() - s.saveDelay() + s.dataStorage() }() if len(rcmd) != 4 { c.send("-Error: subscribe para number!") @@ -222,7 +222,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { delay := s.delays[rcmd[1]+"-"+rcmd[2]] if delay != nil { - if t == -1 { + if t < 0 { delay.timer.Stop() delete(s.delays, rcmd[1]+"-"+rcmd[2]) return diff --git a/zsub/ztopic.go b/zsub/ztopic.go index 9add967..c53c748 100644 --- a/zsub/ztopic.go +++ b/zsub/ztopic.go @@ -5,7 +5,7 @@ import "sync" type ZTopic struct { //ZTopic sync.Mutex groups map[string]*ZGroup - mcount int + mcount int32 topic string // 主题名称 chMsg chan string // 主题消息投递 }