淇敼锛?. 鏂板浜嗛攣鏈哄埗浠ラ槻姝㈠苟鍙戣闂啿绐侊紱2. 淇敼浜嗕竴浜涘彉閲忓悕浠ユ彁楂樹唬鐮佸彲璇绘€э紱 3. 淇浜嗕竴涓彲鑳藉鑷寸紦鍐插尯婧㈠嚭鐨?bug锛屽皢缂撳啿鍖哄ぇ灏忚缃负 4096锛?. 瀵逛唬鐮佽繘琛屼簡涓€浜涘叾浠栫殑灏忔敼杩涘拰浼樺寲銆?

git-svn-id: svn://47.119.165.148/zhub@121 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-05-03 14:35:44 +00:00
parent ee885e198a
commit b38fdfa058
12 changed files with 356 additions and 29 deletions

2
app.go
View File

@@ -7,6 +7,7 @@ import (
"time"
"zhub/cli"
"zhub/conf"
"zhub/monitor"
"zhub/zsub"
)
@@ -46,6 +47,7 @@ func main() {
}
if server {
go monitor.StartHttp()
zsub.ServerStart(addr) // 服务进程启动
} else {
cli.ClientRun(addr)

View File

@@ -3,6 +3,7 @@ package cli
import (
"bufio"
"fmt"
"github.com/go-basic/uuid"
"log"
"net"
"os"
@@ -27,6 +28,15 @@ type Client struct {
chSend chan []string // chan of send message
chReceive chan []string // chan of receive message
timerReceive chan []string // chan of timer
lockFlag map[string]*Lock // chan of lock
}
type Lock struct {
Key string // lock Key
Uuid string // lock Uuid
flagChan chan int //
// starttime uint32 // lock start time
// duration int // lock duration
}
func Create(addr string, groupid string) (*Client, error) {
@@ -48,6 +58,7 @@ func Create(addr string, groupid string) (*Client, error) {
chSend: make(chan []string, 100),
chReceive: make(chan []string, 100),
timerReceive: make(chan []string, 100),
lockFlag: make(map[string]*Lock),
}
client.send("groupid " + groupid)
@@ -115,6 +126,8 @@ subscribe x y z
func (c *Client) Subscribe(topic string, fun func(v string)) {
c.send("subscribe " + topic)
if fun != nil {
c.wlock.Lock()
defer c.wlock.Unlock()
c.subFun[topic] = fun
}
}
@@ -133,7 +146,7 @@ func (c *Client) ping() {
c.send("ping")
}
// -------------------------------------- pub-sub --------------------------------------
//Publish -------------------------------------- pub-sub --------------------------------------
/*
send topic message :
---
@@ -158,7 +171,9 @@ func (c *Client) Delay(topic string, message string, delay int) error {
return c.send("delay", topic, message, strconv.Itoa(delay))
}
/*func (c *Client) Timer(topic string, expr string, fun func()) {
/*
Timer
func (c *Client) Timer(topic string, expr string, fun func()) {
c.timerFun[topic] = fun
c.send("timer", topic, expr, "x")
}*/
@@ -185,6 +200,35 @@ func (c *Client) Close() {
c.conn.Close()
}
// Lock Key
func (c *Client) Lock(key string, duration int) Lock {
uuid := uuid.New()
c.send("lock", key, uuid, strconv.Itoa(duration))
lockChan := make(chan int, 2)
go func() {
c.wlock.Lock()
defer c.wlock.Unlock()
c.lockFlag[uuid] = &Lock{
Key: key,
Uuid: uuid,
flagChan: lockChan,
}
}()
select {
case <-lockChan:
log.Println("lock-ok", time.Now().UnixNano()/1e6, uuid)
}
return Lock{Key: key, Uuid: uuid}
}
func (c *Client) Unlock(l Lock) {
c.send("unlock", l.Key, l.Uuid)
delete(c.lockFlag, l.Uuid)
}
/*func (c *Client) subscribes(topics ...string) error {
if len(topics) == 0 {
return nil
@@ -269,6 +313,19 @@ func (c *Client) receive() {
}
if len(vs) == 3 && strings.EqualFold(vs[0], "message") {
if strings.EqualFold(vs[1], "lock") { // message lock Uuid
go func() {
log.Println("lock:" + vs[2])
c.wlock.Lock()
defer c.wlock.Unlock()
if c.lockFlag[vs[2]] == nil {
return
}
c.lockFlag[vs[2]].flagChan <- 0
}()
continue
}
c.chReceive <- vs
continue
}

View File

@@ -1,6 +1,7 @@
package main
import (
"fmt"
"log"
"strconv"
"testing"
@@ -108,12 +109,13 @@ func TestTimer(t *testing.T) {
}
func TestSendCmd(t *testing.T) {
client, err := cli.Create(addr, "")
client, err := cli.Create(addr, "group-admin")
if err != nil {
log.Println(err)
}
client.Cmd("reload-timer")
//client.Cmd("reload-timer")
client.Cmd("shutdown")
}
func TestPublish(t *testing.T) {
@@ -127,3 +129,29 @@ func TestPublish(t *testing.T) {
time.Sleep(time.Second)
}
func TestLock(t *testing.T) {
client, _ := cli.Create(addr, "xx")
client.Subscribe("lock", func(v string) {
})
var fun = func(x string) {
log.Println("lock", time.Now().UnixNano()/1e6)
lock := client.Lock("a", 30)
defer client.Unlock(lock)
//client.Lock("a", 5)
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 1)
fmt.Println(x + ":" + strconv.Itoa(i+1))
}
}
go fun("x")
go fun("y")
go fun("z")
time.Sleep(time.Second * 30 * 10)
}

1
go.mod
View File

@@ -3,6 +3,7 @@ module zhub
go 1.16
require (
github.com/go-basic/uuid v1.0.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/robfig/cron v1.2.0
)

48
monitor/monitor.go Normal file
View File

@@ -0,0 +1,48 @@
package monitor
import (
"encoding/json"
"net/http"
"os"
"path"
"zhub/zsub"
)
func StartHttp() {
dir, _ := os.Getwd()
webDir := path.Join(dir, "/public")
http.Handle("/", http.FileServer(http.Dir(webDir)))
http.HandleFunc("/info", info)
http.HandleFunc("/cleanup", cleanup)
http.HandleFunc("/retimer", retimer)
http.ListenAndServe(":1217", nil)
}
func retimer(w http.ResponseWriter, r *http.Request) {
zsub.ZSubx().ReloadTimer()
renderJson(w, "+reload timer ok")
}
func cleanup(w http.ResponseWriter, r *http.Request) {
zsub.ZSubx().Clearup()
renderJson(w, "+OK")
}
func info(w http.ResponseWriter, r *http.Request) {
topics := zsub.Info()
renderJson(w, topics)
}
func renderJson(w http.ResponseWriter, d interface{}) {
var bytes []byte
if str, ok := d.(string); ok {
bytes = []byte(str)
} else {
bytes, _ = json.Marshal(d)
w.Header().Set("content-type", "application/json; charset=utf-8;")
}
w.Write(bytes)
}

View File

@@ -3,8 +3,8 @@ SET GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w" ./app.go
upx -9 zhub.sh
scp zhub.sh xhost:/opt/zhub
scp zhub.sh zhost:/opt/zhub
scp zhub.sh qhost:/opt/zhub
scp zhub.sh nhost:/opt/zhub
scp zhub.sh pro:/opt/zhub
scp zhub.sh dev:/opt/zhub
scp zhub.sh qc:/opt/zhub
scp zhub.sh my:/opt/zhub
del zhub.sh

10
public/index.html Normal file
View File

@@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body style="text-align: center;padding: 10px">
<h2>welcome zhub!</h2>
</body>
</html>

View File

@@ -2,6 +2,7 @@ package zsub
import (
"log"
"strconv"
"strings"
"zhub/conf"
)
@@ -94,13 +95,28 @@ func msgAccept(v Message) {
}
switch rcmd[1] {
case "reload-timer":
zsub.reloadTimer()
zsub.ReloadTimer()
case "shutdown":
if !strings.EqualFold(c.groupid, "group-admin") {
return
}
zsub.shutdown()
}
case "lock":
// lock key uuid 5
if len(rcmd) != 4 {
c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
return
}
d, _ := strconv.Atoi(rcmd[3])
zsub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d})
case "unlock":
// unlock key uuid
if len(rcmd) != 3 {
c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
return
}
zsub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]})
default:
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
return

View File

@@ -36,9 +36,10 @@ func Append(str string, fileName string) {
}
// 数据持久化
func (s *ZSub) saveDelay() {
func (s *ZSub) dataStorage() {
s.Lock()
defer s.Unlock()
// delay save
err := os.Remove(conf.DataDir + "/delay.z")
if err != nil {
log.Println(err)
@@ -49,9 +50,23 @@ func (s *ZSub) saveDelay() {
str += fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10))
}
Append(str, conf.DataDir+"/delay.z")
// lock save
err = os.Remove(conf.DataDir + "/lock.z")
if err != nil {
log.Println(err)
}
str = ""
for _, locks := range s.locks {
for _, lock := range locks {
str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start)
break // 只记录获得锁的记录
}
}
Append(str, conf.DataDir+"/lock.z")
}
func (s *ZSub) reloadDelay() {
func (s *ZSub) loadDelay() {
f, err := os.Open(conf.DataDir + "/delay.z")
if err != nil {
return
@@ -85,3 +100,43 @@ func (s *ZSub) reloadDelay() {
s.delay([]string{"delay", split[0], split[1], strconv.FormatInt((exectime-time.Now().Unix())*1000, 10)}, nil)
}
}
func (s *ZSub) loadLock() {
f, err := os.Open(conf.DataDir + "/lock.z")
if err != nil {
return
}
defer f.Close()
r := bufio.NewReader(f)
for {
bytes, err := r.ReadBytes('\n')
if err != nil {
return
}
line := string(bytes)
if len(line) == 0 {
continue
}
line = strings.Trim(line, " \r\n")
split := strings.Split(line, " ")
if len(split) != 4 {
continue
}
duration, err := strconv.Atoi(split[2])
start, err := strconv.ParseInt(split[3], 10, 64)
if start > 0 && time.Now().Unix()-start > 1 {
duration = int(time.Now().Unix() - start)
} else {
duration = 1
}
s._lock(&Lock{
key: split[0],
uuid: split[1],
duration: duration,
// start: start,
})
}
}

View File

@@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"zhub/conf"
)
@@ -16,6 +17,7 @@ var (
topics: make(map[string]*ZTopic),
timers: make(map[string]*ZTimer),
delays: make(map[string]*ZDelay),
locks: make(map[string][]*Lock),
}
)
@@ -24,6 +26,7 @@ type ZSub struct {
topics map[string]*ZTopic
timers map[string]*ZTimer
delays map[string]*ZDelay
locks map[string][]*Lock
}
type ZConn struct { //ZConn
@@ -36,6 +39,15 @@ type ZConn struct { //ZConn
substoped map[string]chan int // 关闭信号量
}
type Lock struct {
key string
uuid string
duration int
timer *time.Timer
start int64
//stop time.Time
}
func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{
conn: conn,
@@ -60,7 +72,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
ztopic = &ZTopic{
groups: map[string]*ZGroup{},
topic: topic,
chMsg: make(chan string, 10000),
chMsg: make(chan string, 500),
}
ztopic.init()
zsub.topics[topic] = ztopic
@@ -71,7 +83,7 @@ func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
zgroup = &ZGroup{
//conns: []*ZConn{},
ztopic: ztopic,
chMsg: make(chan string, 1000),
chMsg: make(chan string, 500),
}
ztopic.groups[c.groupid] = zgroup
}
@@ -187,8 +199,9 @@ func ServerStart(addr string) {
}()
// 重新加载[定时、延时]
go zsub.reloadTimer()
go zsub.reloadDelay()
go zsub.ReloadTimer()
go zsub.loadDelay()
//go zsub.loadLock()
// 启动服务监听
listen, err := net.Listen("tcp", addr)
@@ -237,8 +250,15 @@ func (s *ZSub) acceptHandler(c *ZConn) {
n, _ := strconv.Atoi(string(line[1:]))
for i := 0; i < n; i++ {
reader.ReadLine()
v, _, _ := reader.ReadLine()
rcmd = append(rcmd, string(v))
var vx = ""
a:
if v, prefix, _ := reader.ReadLine(); prefix {
vx += string(v)
goto a
} else {
vx += string(v)
}
rcmd = append(rcmd, vx)
}
default:
rcmd = append(rcmd, string(line))
@@ -253,9 +273,9 @@ func (s *ZSub) acceptHandler(c *ZConn) {
}
/*
accept topic message
1、send message to topic's chan
2、feedback send success to sender, and sending message to topic's subscripts
accept stop message
1、send message to stop's chan
2、feedback send success to sender, and sending message to stop's subscripts
*/
func (s *ZSub) publish(topic, msg string) {
s.RLock()
@@ -274,6 +294,9 @@ send broadcast message
func (s *ZSub) broadcast(topic, msg string) {
s.RLock()
defer s.RUnlock()
if strings.EqualFold(topic, "lock") {
log.Println("lock", msg)
}
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
@@ -287,8 +310,95 @@ func (s *ZSub) broadcast(topic, msg string) {
}
}
/*
lock: lock key uuid t
unlock: unlock key uuid
*/
func (s *ZSub) _lock(lock *Lock) {
locks := s.locks[lock.key]
if locks == nil {
locks = make([]*Lock, 0)
}
if len(locks) == 0 { // lock success
lock.start = time.Now().Unix()
locks = append(locks, lock)
s.locks[lock.key] = locks
s.broadcast("lock", lock.uuid)
// 设置时间到解锁
locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
go func() {
select {
case <-locks[0].timer.C:
s._unlock(*locks[0])
}
}()
} else {
s.locks[lock.key] = append(locks, lock)
}
}
func (s *ZSub) _unlock(l Lock) {
locks := s.locks[l.key]
if locks == nil || len(locks) == 0 {
return
}
if strings.EqualFold(locks[0].uuid, l.uuid) {
locks[0].timer.Stop()
locks = locks[1:]
s.locks[l.key] = locks
}
if len(s.locks[l.key]) > 0 { // next lock
s.broadcast("lock", s.locks[l.key][0].uuid)
s.locks[l.key][0].start = time.Now().Unix()
s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
go func() {
select {
case <-s.locks[l.key][0].timer.C:
s._unlock(*s.locks[l.key][0])
}
}()
}
}
func (s *ZSub) shutdown() {
s.saveDelay()
s.Lock()
s.dataStorage()
os.Exit(0)
}
func Info() map[string]interface{} {
m := map[string]interface{}{}
for s, topic := range zsub.topics {
// {groups:[{name:xxx,size:xx}]}
arr := make([]map[string]interface{}, 0)
for groupname, group := range topic.groups {
arr = append(arr, map[string]interface{}{
"name": groupname,
"subsize": len(group.conns),
"offset": group.offset,
"mcount": topic.mcount,
})
}
m[s] = arr
}
return m
}
func (s *ZSub) Clearup() {
for tn, topic := range s.topics {
for _, group := range topic.groups {
if len(group.conns) > 0 || topic.mcount > group.offset {
goto a
}
}
close(topic.chMsg)
delete(s.topics, tn)
a:
}
}
func ZSubx() *ZSub {
return zsub
}

View File

@@ -165,7 +165,7 @@ func executeShell(command string) (string, error, string) {
return stdout.String(), err, stderr.String()
}
func (s *ZSub) reloadTimer() {
func (s *ZSub) ReloadTimer() {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
conf.GetStr("ztimer.db.user", "root"),
conf.GetStr("ztimer.db.pwd", "123456"),
@@ -207,7 +207,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
s.Lock()
defer func() {
s.Unlock()
s.saveDelay()
s.dataStorage()
}()
if len(rcmd) != 4 {
c.send("-Error: subscribe para number!")
@@ -222,7 +222,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
delay := s.delays[rcmd[1]+"-"+rcmd[2]]
if delay != nil {
if t == -1 {
if t < 0 {
delay.timer.Stop()
delete(s.delays, rcmd[1]+"-"+rcmd[2])
return

View File

@@ -5,7 +5,7 @@ import "sync"
type ZTopic struct { //ZTopic
sync.Mutex
groups map[string]*ZGroup
mcount int
mcount int32
topic string // 主题名称
chMsg chan string // 主题消息投递
}