diff --git a/app.go b/app.go
index d9dbeda..22db86b 100644
--- a/app.go
+++ b/app.go
@@ -7,6 +7,7 @@ import (
"time"
"zhub/cli"
"zhub/conf"
+ "zhub/monitor"
"zhub/zsub"
)
@@ -46,6 +47,7 @@ func main() {
}
if server {
+ go monitor.StartHttp()
zsub.ServerStart(addr) // 服务进程启动
} else {
cli.ClientRun(addr)
diff --git a/cli/client.go b/cli/client.go
index 38f0302..91855b4 100644
--- a/cli/client.go
+++ b/cli/client.go
@@ -3,6 +3,7 @@ package cli
import (
"bufio"
"fmt"
+ "github.com/go-basic/uuid"
"log"
"net"
"os"
@@ -24,9 +25,18 @@ type Client struct {
subFun map[string]func(v string) // subscribe topic and callback function
timerFun map[string]func() // subscribe timer amd callback function
- chSend chan []string // chan of send message
- chReceive chan []string // chan of receive message
- timerReceive chan []string // chan of timer
+ chSend chan []string // chan of send message
+ chReceive chan []string // chan of receive message
+ timerReceive chan []string // chan of timer
+ lockFlag map[string]*Lock // chan of lock
+}
+
+type Lock struct {
+ Key string // lock Key
+ Uuid string // lock Uuid
+ flagChan chan int //
+ // starttime uint32 // lock start time
+ // duration int // lock duration
}
func Create(addr string, groupid string) (*Client, error) {
@@ -48,6 +58,7 @@ func Create(addr string, groupid string) (*Client, error) {
chSend: make(chan []string, 100),
chReceive: make(chan []string, 100),
timerReceive: make(chan []string, 100),
+ lockFlag: make(map[string]*Lock),
}
client.send("groupid " + groupid)
@@ -115,6 +126,8 @@ subscribe x y z
func (c *Client) Subscribe(topic string, fun func(v string)) {
c.send("subscribe " + topic)
if fun != nil {
+ c.wlock.Lock()
+ defer c.wlock.Unlock()
c.subFun[topic] = fun
}
}
@@ -133,7 +146,7 @@ func (c *Client) ping() {
c.send("ping")
}
-// -------------------------------------- pub-sub --------------------------------------
+//Publish -------------------------------------- pub-sub --------------------------------------
/*
send topic message :
---
@@ -158,7 +171,9 @@ func (c *Client) Delay(topic string, message string, delay int) error {
return c.send("delay", topic, message, strconv.Itoa(delay))
}
-/*func (c *Client) Timer(topic string, expr string, fun func()) {
+/*
+Timer
+func (c *Client) Timer(topic string, expr string, fun func()) {
c.timerFun[topic] = fun
c.send("timer", topic, expr, "x")
}*/
@@ -185,6 +200,35 @@ func (c *Client) Close() {
c.conn.Close()
}
+// Lock Key
+func (c *Client) Lock(key string, duration int) Lock {
+ uuid := uuid.New()
+ c.send("lock", key, uuid, strconv.Itoa(duration))
+
+ lockChan := make(chan int, 2)
+ go func() {
+ c.wlock.Lock()
+ defer c.wlock.Unlock()
+ c.lockFlag[uuid] = &Lock{
+ Key: key,
+ Uuid: uuid,
+ flagChan: lockChan,
+ }
+ }()
+
+ select {
+ case <-lockChan:
+ log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid)
+ }
+
+ return Lock{Key: key, Uuid: uuid}
+}
+
+func (c *Client) Unlock(l Lock) {
+ c.send("unlock", l.Key, l.Uuid)
+ delete(c.lockFlag, l.Uuid)
+}
+
/*func (c *Client) subscribes(topics ...string) error {
if len(topics) == 0 {
return nil
@@ -269,6 +313,19 @@ func (c *Client) receive() {
}
if len(vs) == 3 && strings.EqualFold(vs[0], "message") {
+ if strings.EqualFold(vs[1], "lock") { // message lock Uuid
+ go func() {
+ log.Println("lock:" + vs[2])
+ c.wlock.Lock()
+ defer c.wlock.Unlock()
+
+ if c.lockFlag[vs[2]] == nil {
+ return
+ }
+ c.lockFlag[vs[2]].flagChan <- 0
+ }()
+ continue
+ }
c.chReceive <- vs
continue
}
diff --git a/cli_test.go b/cli_test.go
index f808d37..359173d 100644
--- a/cli_test.go
+++ b/cli_test.go
@@ -1,6 +1,7 @@
package main
import (
+ "fmt"
"log"
"strconv"
"testing"
@@ -108,12 +109,13 @@ func TestTimer(t *testing.T) {
}
func TestSendCmd(t *testing.T) {
- client, err := cli.Create(addr, "")
+ client, err := cli.Create(addr, "group-admin")
if err != nil {
log.Println(err)
}
- client.Cmd("reload-timer")
+ //client.Cmd("reload-timer")
+ client.Cmd("shutdown")
}
func TestPublish(t *testing.T) {
@@ -127,3 +129,29 @@ func TestPublish(t *testing.T) {
time.Sleep(time.Second)
}
+
+func TestLock(t *testing.T) {
+ client, _ := cli.Create(addr, "xx")
+
+ client.Subscribe("lock", func(v string) {
+
+ })
+
+ var fun = func(x string) {
+ log.Println("lock", time.Now().UnixNano()/1e6)
+ lock := client.Lock("a", 30)
+ defer client.Unlock(lock)
+ //client.Lock("a", 5)
+
+ for i := 0; i < 5; i++ {
+ time.Sleep(time.Second * 1)
+ fmt.Println(x + ":" + strconv.Itoa(i+1))
+ }
+ }
+
+ go fun("x")
+ go fun("y")
+ go fun("z")
+
+ time.Sleep(time.Second * 30 * 10)
+}
diff --git a/go.mod b/go.mod
index 436540b..a7d7b8e 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module zhub
go 1.16
require (
+ github.com/go-basic/uuid v1.0.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/robfig/cron v1.2.0
)
diff --git a/monitor/monitor.go b/monitor/monitor.go
new file mode 100644
index 0000000..ecb5d4d
--- /dev/null
+++ b/monitor/monitor.go
@@ -0,0 +1,48 @@
+package monitor
+
+import (
+ "encoding/json"
+ "net/http"
+ "os"
+ "path"
+ "zhub/zsub"
+)
+
+func StartHttp() {
+ dir, _ := os.Getwd()
+ webDir := path.Join(dir, "/public")
+
+ http.Handle("/", http.FileServer(http.Dir(webDir)))
+ http.HandleFunc("/info", info)
+ http.HandleFunc("/cleanup", cleanup)
+ http.HandleFunc("/retimer", retimer)
+
+ http.ListenAndServe(":1217", nil)
+}
+
+func retimer(w http.ResponseWriter, r *http.Request) {
+ zsub.ZSubx().ReloadTimer()
+ renderJson(w, "+reload timer ok")
+}
+
+func cleanup(w http.ResponseWriter, r *http.Request) {
+ zsub.ZSubx().Clearup()
+ renderJson(w, "+OK")
+}
+
+func info(w http.ResponseWriter, r *http.Request) {
+ topics := zsub.Info()
+ renderJson(w, topics)
+}
+
+func renderJson(w http.ResponseWriter, d interface{}) {
+ var bytes []byte
+
+ if str, ok := d.(string); ok {
+ bytes = []byte(str)
+ } else {
+ bytes, _ = json.Marshal(d)
+ w.Header().Set("content-type", "application/json; charset=utf-8;")
+ }
+ w.Write(bytes)
+}
diff --git a/pkg.bat b/pkg.bat
index 6d4882c..f23800d 100644
--- a/pkg.bat
+++ b/pkg.bat
@@ -3,8 +3,8 @@ SET GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w" ./app.go
upx -9 zhub.sh
-scp zhub.sh xhost:/opt/zhub
-scp zhub.sh zhost:/opt/zhub
-scp zhub.sh qhost:/opt/zhub
-scp zhub.sh nhost:/opt/zhub
+scp zhub.sh pro:/opt/zhub
+scp zhub.sh dev:/opt/zhub
+scp zhub.sh qc:/opt/zhub
+scp zhub.sh my:/opt/zhub
del zhub.sh
diff --git a/public/index.html b/public/index.html
new file mode 100644
index 0000000..3978828
--- /dev/null
+++ b/public/index.html
@@ -0,0 +1,10 @@
+
+
+
+
+ Title
+
+
+welcome zhub!
+
+
\ No newline at end of file
diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go
index 2dcdc87..91ac3d7 100644
--- a/zsub/msg-consumer.go
+++ b/zsub/msg-consumer.go
@@ -2,6 +2,7 @@ package zsub
import (
"log"
+ "strconv"
"strings"
"zhub/conf"
)
@@ -94,13 +95,28 @@ func msgAccept(v Message) {
}
switch rcmd[1] {
case "reload-timer":
- zsub.reloadTimer()
+ zsub.ReloadTimer()
case "shutdown":
if !strings.EqualFold(c.groupid, "group-admin") {
return
}
zsub.shutdown()
}
+ case "lock":
+ // lock key uuid 5
+ if len(rcmd) != 4 {
+ c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
+ return
+ }
+ d, _ := strconv.Atoi(rcmd[3])
+ zsub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d})
+ case "unlock":
+ // unlock key uuid
+ if len(rcmd) != 3 {
+ c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
+ return
+ }
+ zsub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]})
default:
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
return
diff --git a/zsub/zdb.go b/zsub/zdb.go
index 2d72946..2e43c59 100644
--- a/zsub/zdb.go
+++ b/zsub/zdb.go
@@ -36,9 +36,10 @@ func Append(str string, fileName string) {
}
// 数据持久化
-func (s *ZSub) saveDelay() {
+func (s *ZSub) dataStorage() {
s.Lock()
defer s.Unlock()
+ // delay save
err := os.Remove(conf.DataDir + "/delay.z")
if err != nil {
log.Println(err)
@@ -49,9 +50,23 @@ func (s *ZSub) saveDelay() {
str += fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10))
}
Append(str, conf.DataDir+"/delay.z")
+
+ // lock save
+ err = os.Remove(conf.DataDir + "/lock.z")
+ if err != nil {
+ log.Println(err)
+ }
+ str = ""
+ for _, locks := range s.locks {
+ for _, lock := range locks {
+ str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start)
+ break // 只记录获得锁的记录
+ }
+ }
+ Append(str, conf.DataDir+"/lock.z")
}
-func (s *ZSub) reloadDelay() {
+func (s *ZSub) loadDelay() {
f, err := os.Open(conf.DataDir + "/delay.z")
if err != nil {
return
@@ -85,3 +100,43 @@ func (s *ZSub) reloadDelay() {
s.delay([]string{"delay", split[0], split[1], strconv.FormatInt((exectime-time.Now().Unix())*1000, 10)}, nil)
}
}
+
+func (s *ZSub) loadLock() {
+ f, err := os.Open(conf.DataDir + "/lock.z")
+ if err != nil {
+ return
+ }
+ defer f.Close()
+
+ r := bufio.NewReader(f)
+ for {
+ bytes, err := r.ReadBytes('\n')
+ if err != nil {
+ return
+ }
+ line := string(bytes)
+ if len(line) == 0 {
+ continue
+ }
+ line = strings.Trim(line, " \r\n")
+ split := strings.Split(line, " ")
+ if len(split) != 4 {
+ continue
+ }
+ duration, err := strconv.Atoi(split[2])
+ start, err := strconv.ParseInt(split[3], 10, 64)
+
+ if start > 0 && time.Now().Unix()-start > 1 {
+ duration = int(time.Now().Unix() - start)
+ } else {
+ duration = 1
+ }
+
+ s._lock(&Lock{
+ key: split[0],
+ uuid: split[1],
+ duration: duration,
+ // start: start,
+ })
+ }
+}
diff --git a/zsub/zsub.go b/zsub/zsub.go
index ef9362b..9f7b400 100644
--- a/zsub/zsub.go
+++ b/zsub/zsub.go
@@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
"zhub/conf"
)
@@ -16,6 +17,7 @@ var (
topics: make(map[string]*ZTopic),
timers: make(map[string]*ZTimer),
delays: make(map[string]*ZDelay),
+ locks: make(map[string][]*Lock),
}
)
@@ -24,6 +26,7 @@ type ZSub struct {
topics map[string]*ZTopic
timers map[string]*ZTimer
delays map[string]*ZDelay
+ locks map[string][]*Lock
}
type ZConn struct { //ZConn
@@ -36,6 +39,15 @@ type ZConn struct { //ZConn
substoped map[string]chan int // 关闭信号量
}
+type Lock struct {
+ key string
+ uuid string
+ duration int
+ timer *time.Timer
+ start int64
+ //stop time.Time
+}
+
func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{
conn: conn,
@@ -60,7 +72,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
ztopic = &ZTopic{
groups: map[string]*ZGroup{},
topic: topic,
- chMsg: make(chan string, 10000),
+ chMsg: make(chan string, 500),
}
ztopic.init()
zsub.topics[topic] = ztopic
@@ -71,7 +83,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
zgroup = &ZGroup{
//conns: []*ZConn{},
ztopic: ztopic,
- chMsg: make(chan string, 1000),
+ chMsg: make(chan string, 500),
}
ztopic.groups[c.groupid] = zgroup
}
@@ -187,8 +199,9 @@ func ServerStart(addr string) {
}()
// 重新加载[定时、延时]
- go zsub.reloadTimer()
- go zsub.reloadDelay()
+ go zsub.ReloadTimer()
+ go zsub.loadDelay()
+ //go zsub.loadLock()
// 启动服务监听
listen, err := net.Listen("tcp", addr)
@@ -237,8 +250,15 @@ func (s *ZSub) acceptHandler(c *ZConn) {
n, _ := strconv.Atoi(string(line[1:]))
for i := 0; i < n; i++ {
reader.ReadLine()
- v, _, _ := reader.ReadLine()
- rcmd = append(rcmd, string(v))
+ var vx = ""
+ a:
+ if v, prefix, _ := reader.ReadLine(); prefix {
+ vx += string(v)
+ goto a
+ } else {
+ vx += string(v)
+ }
+ rcmd = append(rcmd, vx)
}
default:
rcmd = append(rcmd, string(line))
@@ -253,9 +273,9 @@ func (s *ZSub) acceptHandler(c *ZConn) {
}
/*
-accept topic message
-1、send message to topic's chan
-2、feedback send success to sender, and sending message to topic's subscripts
+accept stop message
+1、send message to stop's chan
+2、feedback send success to sender, and sending message to stop's subscripts
*/
func (s *ZSub) publish(topic, msg string) {
s.RLock()
@@ -274,6 +294,9 @@ send broadcast message
func (s *ZSub) broadcast(topic, msg string) {
s.RLock()
defer s.RUnlock()
+ if strings.EqualFold(topic, "lock") {
+ log.Println("lock", msg)
+ }
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
@@ -287,8 +310,95 @@ func (s *ZSub) broadcast(topic, msg string) {
}
}
+/*
+lock: lock key uuid t
+unlock: unlock key uuid
+*/
+func (s *ZSub) _lock(lock *Lock) {
+ locks := s.locks[lock.key]
+ if locks == nil {
+ locks = make([]*Lock, 0)
+ }
+ if len(locks) == 0 { // lock success
+ lock.start = time.Now().Unix()
+ locks = append(locks, lock)
+ s.locks[lock.key] = locks
+ s.broadcast("lock", lock.uuid)
+
+ // 设置时间到解锁
+ locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
+ go func() {
+ select {
+ case <-locks[0].timer.C:
+ s._unlock(*locks[0])
+ }
+ }()
+ } else {
+ s.locks[lock.key] = append(locks, lock)
+ }
+}
+func (s *ZSub) _unlock(l Lock) {
+ locks := s.locks[l.key]
+ if locks == nil || len(locks) == 0 {
+ return
+ }
+ if strings.EqualFold(locks[0].uuid, l.uuid) {
+ locks[0].timer.Stop()
+ locks = locks[1:]
+ s.locks[l.key] = locks
+ }
+ if len(s.locks[l.key]) > 0 { // next lock
+ s.broadcast("lock", s.locks[l.key][0].uuid)
+ s.locks[l.key][0].start = time.Now().Unix()
+ s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
+ go func() {
+ select {
+ case <-s.locks[l.key][0].timer.C:
+ s._unlock(*s.locks[l.key][0])
+ }
+ }()
+ }
+}
+
func (s *ZSub) shutdown() {
- s.saveDelay()
- s.Lock()
+ s.dataStorage()
os.Exit(0)
}
+
+func Info() map[string]interface{} {
+ m := map[string]interface{}{}
+
+ for s, topic := range zsub.topics {
+ // {groups:[{name:xxx,size:xx}]}
+ arr := make([]map[string]interface{}, 0)
+
+ for groupname, group := range topic.groups {
+ arr = append(arr, map[string]interface{}{
+ "name": groupname,
+ "subsize": len(group.conns),
+ "offset": group.offset,
+ "mcount": topic.mcount,
+ })
+ }
+ m[s] = arr
+ }
+
+ return m
+}
+
+func (s *ZSub) Clearup() {
+ for tn, topic := range s.topics {
+ for _, group := range topic.groups {
+ if len(group.conns) > 0 || topic.mcount > group.offset {
+ goto a
+ }
+ }
+ close(topic.chMsg)
+ delete(s.topics, tn)
+ a:
+ }
+}
+
+func ZSubx() *ZSub {
+ return zsub
+}
diff --git a/zsub/ztimer.go b/zsub/ztimer.go
index d482dd4..8a6cf6b 100644
--- a/zsub/ztimer.go
+++ b/zsub/ztimer.go
@@ -165,7 +165,7 @@ func executeShell(command string) (string, error, string) {
return stdout.String(), err, stderr.String()
}
-func (s *ZSub) reloadTimer() {
+func (s *ZSub) ReloadTimer() {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
conf.GetStr("ztimer.db.user", "root"),
conf.GetStr("ztimer.db.pwd", "123456"),
@@ -207,7 +207,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
s.Lock()
defer func() {
s.Unlock()
- s.saveDelay()
+ s.dataStorage()
}()
if len(rcmd) != 4 {
c.send("-Error: subscribe para number!")
@@ -222,7 +222,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
delay := s.delays[rcmd[1]+"-"+rcmd[2]]
if delay != nil {
- if t == -1 {
+ if t < 0 {
delay.timer.Stop()
delete(s.delays, rcmd[1]+"-"+rcmd[2])
return
diff --git a/zsub/ztopic.go b/zsub/ztopic.go
index 9add967..c53c748 100644
--- a/zsub/ztopic.go
+++ b/zsub/ztopic.go
@@ -5,7 +5,7 @@ import "sync"
type ZTopic struct { //ZTopic
sync.Mutex
groups map[string]*ZGroup
- mcount int
+ mcount int32
topic string // 主题名称
chMsg chan string // 主题消息投递
}