升级:ztimer 支持周期循环定时调度表达式

git-svn-id: svn://47.119.165.148/zhub@70 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-01-14 11:37:36 +00:00
parent 080d9be53b
commit a99f2398af
4 changed files with 73 additions and 20 deletions

View File

@@ -48,20 +48,36 @@ func TestCli(t *testing.T) {
func TestTimer(t *testing.T) {
go func() {
client, _ := cli.Create(addr, "topic-x")
client, _ := cli.Create(addr, "topic-1")
client.Timer("a", func() {
log.Println("client-1 收到 a 的定时消息")
})
}()
time.Sleep(time.Second * 5)
go func() {
client, _ := cli.Create(addr, "topic-x")
client, _ := cli.Create(addr, "topic-2")
client.Timer("a", func() {
log.Println("client-2 收到 a 的定时消息")
})
}()
go func() {
client, _ := cli.Create(addr, "topic-3")
client.Timer("c", func() {
log.Println("client-2 收到 c 的定时消息")
})
client.Timer("b", func() {
log.Println("client-2 收到 b 的定时消息")
})
client.Timer("STANDING-DOWNLOAD-GAME", func() {
log.Println("client-2 收到 STANDING-DOWNLOAD-GAME 的定时消息")
})
client.Timer("VIP-EXP-EXPIRE", func() {
log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息")
})
}()
time.Sleep(time.Hour * 3)
}

View File

@@ -2,3 +2,6 @@ SET GOOS=linux
SET GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w" ./main.go
upx -9 zhub.sh
scp zhub.sh zhost:
del zhub.sh

View File

@@ -167,7 +167,7 @@ func ServerStart(host string, port int) {
log.Println(err)
continue
}
fmt.Println("conn start: ", conn.RemoteAddr())
log.Println("conn start: ", conn.RemoteAddr())
go zsub.acceptHandler(&ZConn{
conn: &conn,

View File

@@ -8,8 +8,11 @@ import (
"github.com/robfig/cron"
"log"
"os/exec"
"regexp"
"strconv"
"strings"
"text/template"
"time"
)
type ZTimer struct {
@@ -17,6 +20,7 @@ type ZTimer struct {
expr string
topic string
cron *cron.Cron
ticker *time.Ticker
single bool
}
@@ -38,30 +42,60 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
timer.conns = c.appendTo(timer.conns)
}
// todo: when timer.expr changed send message to all the timers subscribe
if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) {
timer.expr = rcmd[2]
if timer.cron != nil {
timer.cron.Stop()
}
timer.cron = func() *cron.Cron {
c := cron.New()
c.AddFunc(timer.expr, func() {
for _, conn := range timer.conns {
err := send(conn.conn, "timer", timer.topic)
if timer.single && err == nil {
break
}
if timer.ticker != nil {
timer.ticker.Stop()
}
var timerFun = func() {
for _, conn := range timer.conns {
err := send(conn.conn, "timer", timer.topic)
if timer.single && err == nil {
break
}
})
go c.Run()
return c
}()
timer.configSave()
}
}
r, _ := regexp.Compile("^\\d+[d,H,m,s]$")
expr := timer.expr
if r.MatchString(expr) {
n, _ := strconv.Atoi(expr[:len(expr)-1])
_n := time.Duration(n)
var ticker *time.Ticker
switch expr[len(expr)-1:] {
case "d":
ticker = time.NewTicker(_n * time.Hour * 24)
case "H":
ticker = time.NewTicker(_n * time.Hour)
case "m":
ticker = time.NewTicker(_n * time.Minute)
case "s":
ticker = time.NewTicker(_n * time.Second)
}
timer.ticker = ticker
go func() {
for range ticker.C {
timerFun()
}
}()
} else {
timer.cron = func() *cron.Cron {
c := cron.New()
c.AddFunc(timer.expr, timerFun)
go c.Run()
return c
}()
}
//timer.configSave()
}
if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.single) {
timer.single = strings.EqualFold("a", rcmd[3])
timer.configSave()
//timer.configSave()
}
s.timers[rcmd[1]] = timer
@@ -134,7 +168,7 @@ func executeShell(command string) (string, error, string) {
func (s *ZSub) reloadTimerConfig() {
db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(47.111.150.118:6063)/platf_oth?charset=utf8") // dev
//db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(0.0.0.0:6033)/platf_oth?charset=utf8") // qc
//db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(121.196.17.55:6063)/platf_oth?charset=utf8") // qc
//db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(122.112.180.156:6033)/platf_oth?charset=utf8") // pro
if err != nil {
log.Println(err)