修改:使用 数据库配置定时调度,配置变更发送指令重新加载

git-svn-id: svn://47.119.165.148/zhub@67 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-01-12 10:56:22 +00:00
parent 3d380e9353
commit 080d9be53b
7 changed files with 107 additions and 27 deletions

View File

@@ -144,9 +144,13 @@ func (c *Client) Daly(topic string, message string, daly int) error {
return nil return nil
} }
func (c *Client) Timer(topic string, expr string, fun func()) { /*func (c *Client) Timer(topic string, expr string, fun func()) {
c.timerFun[topic] = fun c.timerFun[topic] = fun
c.send("timer", topic, expr, "x") c.send("timer", topic, expr, "x")
}*/
func (c *Client) Timer(topic string, fun func()) {
c.timerFun[topic] = fun
c.send("timer", topic)
} }
func (c *Client) TimerSingle(topic string, expr string, fun func()) { func (c *Client) TimerSingle(topic string, expr string, fun func()) {
c.timerFun[topic] = fun c.timerFun[topic] = fun
@@ -158,6 +162,11 @@ func (c *Client) timer(topic string) {
c.send("timer", topic) c.send("timer", topic)
} }
// send cmd
func (c *Client) Cmd(cmd string) {
c.send("cmd", cmd)
}
/* /*
// subscribe topic // subscribe topic
--- ---

View File

@@ -2,35 +2,41 @@ package main
import ( import (
"log" "log"
"strconv"
"testing" "testing"
"time" "time"
"zhub/cli" "zhub/cli"
) )
var (
addr = "47.111.150.118:6066"
//addr = "127.0.0.1:1216"
)
func TestCli(t *testing.T) { func TestCli(t *testing.T) {
//client, err := cli.Create("39.108.56.246:7070", "") //client, err := cli.Create("39.108.56.246:7070", "")
client, err := cli.Create("47.111.150.118:6066", "") client, err := cli.Create(addr, "xx")
//client, err := cli.Create("127.0.0.1:1216", "topic-x") //client, err := cli.Create(addr, "topic-x")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// 订阅主题 消息 // 订阅主题 消息
client.Subscribe("a", func(v string) { client.Subscribe("ax", func(v string) {
log.Println("收到主题 a 消息 " + v) log.Println("收到主题 ax 消息 " + v)
}) })
// 定时调度 // 定时调度
client.Timer("a", "*/5 * * * * *", func() { client.Timer("a", func() {
log.Println("收到 t------------------x 定时消息") log.Println("收到 a 定时消息")
}) })
/*go func() { go func() {
for i := 0; i < 100000; i++ { for i := 0; i < 100000; i++ {
client.Publish("a", strconv.Itoa(i)) client.Publish("ax", strconv.Itoa(i))
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}()*/ }()
client.Subscribe("a", func(v string) { client.Subscribe("a", func(v string) {
log.Println("收到主题 a 消息 " + v) log.Println("收到主题 a 消息 " + v)
@@ -42,23 +48,39 @@ func TestCli(t *testing.T) {
func TestTimer(t *testing.T) { func TestTimer(t *testing.T) {
go func() { go func() {
client, _ := cli.Create("127.0.0.1:1216", "topic-x") client, _ := cli.Create(addr, "topic-x")
client.Timer("t", "*/3 * * * * *", func() { client.Timer("a", func() {
log.Println("=======收到 t 定时消息") log.Println("client-1 收到 a 的定时消息")
})
client.Timer("t------------------x", "*/3 * * * * *", func() {
log.Println("收到 t------------------x 定时消息")
}) })
}() }()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
go func() { go func() {
client, _ := cli.Create("127.0.0.1:1216", "topic-x") client, _ := cli.Create(addr, "topic-x")
client.Timer("t", "*/5 * * * * *", func() { client.Timer("a", func() {
log.Println("-------收到 t 定时消息") log.Println("client-2 收到 a 的定时消息")
}) })
}() }()
time.Sleep(time.Hour * 3) time.Sleep(time.Hour * 3)
} }
func TestSendCmd(t *testing.T) {
client, err := cli.Create(addr, "")
if err != nil {
log.Println(err)
}
client.Cmd("reload-timer-config")
}
func TestPublish(t *testing.T) {
client, err := cli.Create(addr, "")
if err != nil {
log.Println(err)
}
client.Publish("ax", "a")
time.Sleep(time.Second)
}

5
go.mod
View File

@@ -2,4 +2,7 @@ module zhub
go 1.15 go 1.15
require github.com/robfig/cron v1.2.0 require (
github.com/go-sql-driver/mysql v1.5.0
github.com/robfig/cron v1.2.0
)

2
go.sum
View File

@@ -1,2 +1,4 @@
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=

View File

@@ -70,6 +70,14 @@ func msgAccept(v Message) {
daly(rcmd, c) daly(rcmd, c)
case "timer": case "timer":
zsub.timer(rcmd, c) zsub.timer(rcmd, c)
case "cmd":
if len(rcmd) == 1 {
return
}
switch rcmd[1] {
case "reload-timer-config":
zsub.reloadTimerConfig()
}
default: default:
send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]")
return return

View File

@@ -17,6 +17,10 @@ var (
} }
) )
func init() {
zsub.reloadTimerConfig()
}
type ZSub struct { type ZSub struct {
sync.Mutex sync.Mutex
topics map[string]*ZTopic topics map[string]*ZTopic

View File

@@ -2,7 +2,9 @@ package zsub
import ( import (
"bytes" "bytes"
"database/sql"
"fmt" "fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/robfig/cron" "github.com/robfig/cron"
"log" "log"
"os/exec" "os/exec"
@@ -32,7 +34,9 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
} }
s.timers[rcmd[1]] = timer s.timers[rcmd[1]] = timer
} }
if c != nil {
timer.conns = c.appendTo(timer.conns) timer.conns = c.appendTo(timer.conns)
}
// todo: when timer.expr changed send message to all the timers subscribe // todo: when timer.expr changed send message to all the timers subscribe
if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) { if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) {
@@ -55,8 +59,8 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
}() }()
timer.configSave() timer.configSave()
} }
if len(rcmd) == 4 && !strings.EqualFold("a", rcmd[3]) && !timer.single { if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.single) {
timer.single = true timer.single = strings.EqualFold("a", rcmd[3])
timer.configSave() timer.configSave()
} }
@@ -104,15 +108,19 @@ func (t *ZTimer) configSave() {
log.Println(err) log.Println(err)
} }
fmt.Println(buf.String()) //fmt.Println(buf.String())
rest, err, s := executeShell(buf.String()) rest, err, s := executeShell(buf.String())
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
if !strings.EqualFold(rest, "") {
fmt.Println("res:", rest) fmt.Println("res:", rest)
}
if !strings.EqualFold(s, "") {
fmt.Println("error-rest:", s) fmt.Println("error-rest:", s)
} }
}
func executeShell(command string) (string, error, string) { func executeShell(command string) (string, error, string) {
var stdout bytes.Buffer var stdout bytes.Buffer
@@ -123,3 +131,27 @@ func executeShell(command string) (string, error, string) {
err := cmd.Run() err := cmd.Run()
return stdout.String(), err, stderr.String() return stdout.String(), err, stderr.String()
} }
func (s *ZSub) reloadTimerConfig() {
db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(47.111.150.118:6063)/platf_oth?charset=utf8") // dev
//db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(0.0.0.0:6033)/platf_oth?charset=utf8") // qc
//db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(122.112.180.156:6033)/platf_oth?charset=utf8") // pro
if err != nil {
log.Println(err)
return
}
defer db.Close()
rows, err := db.Query("SELECT t.`name`,t.`expr`,IF(t.`single`=1,'a','x') 'single' FROM tasktimer t WHERE t.`status`=10 ORDER BY t.`timerid`")
if err != nil {
log.Println(err)
}
for rows.Next() {
var name string
var expr string
var single string
rows.Scan(&name, &expr, &single)
s.timer([]string{"timer", name, expr, single}, nil) //["timer", topic, expr, a|x]
}
}