diff --git a/cli/client.go b/cli/client.go index 8aecc7d..a49e44e 100644 --- a/cli/client.go +++ b/cli/client.go @@ -144,9 +144,13 @@ func (c *Client) Daly(topic string, message string, daly int) error { return nil } -func (c *Client) Timer(topic string, expr string, fun func()) { +/*func (c *Client) Timer(topic string, expr string, fun func()) { c.timerFun[topic] = fun c.send("timer", topic, expr, "x") +}*/ +func (c *Client) Timer(topic string, fun func()) { + c.timerFun[topic] = fun + c.send("timer", topic) } func (c *Client) TimerSingle(topic string, expr string, fun func()) { c.timerFun[topic] = fun @@ -158,6 +162,11 @@ func (c *Client) timer(topic string) { c.send("timer", topic) } +// send cmd +func (c *Client) Cmd(cmd string) { + c.send("cmd", cmd) +} + /* // subscribe topic --- diff --git a/cli_test.go b/cli_test.go index ee73aa7..97ae24c 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,35 +2,41 @@ package main import ( "log" + "strconv" "testing" "time" "zhub/cli" ) +var ( + addr = "47.111.150.118:6066" + //addr = "127.0.0.1:1216" +) + func TestCli(t *testing.T) { //client, err := cli.Create("39.108.56.246:7070", "") - client, err := cli.Create("47.111.150.118:6066", "") - //client, err := cli.Create("127.0.0.1:1216", "topic-x") + client, err := cli.Create(addr, "xx") + //client, err := cli.Create(addr, "topic-x") if err != nil { log.Fatal(err) } // 订阅主题 消息 - client.Subscribe("a", func(v string) { - log.Println("收到主题 a 消息 " + v) + client.Subscribe("ax", func(v string) { + log.Println("收到主题 ax 消息 " + v) }) // 定时调度 - client.Timer("a", "*/5 * * * * *", func() { - log.Println("收到 t------------------x 定时消息") + client.Timer("a", func() { + log.Println("收到 a 定时消息") }) - /*go func() { + go func() { for i := 0; i < 100000; i++ { - client.Publish("a", strconv.Itoa(i)) + client.Publish("ax", strconv.Itoa(i)) time.Sleep(time.Second) } - }()*/ + }() client.Subscribe("a", func(v string) { log.Println("收到主题 a 消息 " + v) @@ -42,23 +48,39 @@ func TestCli(t *testing.T) { func TestTimer(t *testing.T) { go func() { - client, _ := cli.Create("127.0.0.1:1216", "topic-x") - client.Timer("t", "*/3 * * * * *", func() { - log.Println("=======收到 t 定时消息") - }) - - client.Timer("t------------------x", "*/3 * * * * *", func() { - log.Println("收到 t------------------x 定时消息") + client, _ := cli.Create(addr, "topic-x") + client.Timer("a", func() { + log.Println("client-1 收到 a 的定时消息") }) }() time.Sleep(time.Second * 5) go func() { - client, _ := cli.Create("127.0.0.1:1216", "topic-x") - client.Timer("t", "*/5 * * * * *", func() { - log.Println("-------收到 t 定时消息") + client, _ := cli.Create(addr, "topic-x") + client.Timer("a", func() { + log.Println("client-2 收到 a 的定时消息") }) }() time.Sleep(time.Hour * 3) } + +func TestSendCmd(t *testing.T) { + client, err := cli.Create(addr, "") + if err != nil { + log.Println(err) + } + + client.Cmd("reload-timer-config") +} + +func TestPublish(t *testing.T) { + client, err := cli.Create(addr, "") + if err != nil { + log.Println(err) + } + + client.Publish("ax", "a") + + time.Sleep(time.Second) +} diff --git a/go.mod b/go.mod index 04a8cdb..afe5611 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module zhub go 1.15 -require github.com/robfig/cron v1.2.0 +require ( + github.com/go-sql-driver/mysql v1.5.0 + github.com/robfig/cron v1.2.0 +) diff --git a/go.sum b/go.sum index b5b4795..4275456 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 51bf3a9..9864edf 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -70,6 +70,14 @@ func msgAccept(v Message) { daly(rcmd, c) case "timer": zsub.timer(rcmd, c) + case "cmd": + if len(rcmd) == 1 { + return + } + switch rcmd[1] { + case "reload-timer-config": + zsub.reloadTimerConfig() + } default: send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") return diff --git a/zsub/zsub.go b/zsub/zsub.go index ac73fb3..1b2af0f 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -17,6 +17,10 @@ var ( } ) +func init() { + zsub.reloadTimerConfig() +} + type ZSub struct { sync.Mutex topics map[string]*ZTopic diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 3729e2c..e45fa0c 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -2,7 +2,9 @@ package zsub import ( "bytes" + "database/sql" "fmt" + _ "github.com/go-sql-driver/mysql" "github.com/robfig/cron" "log" "os/exec" @@ -32,7 +34,9 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { } s.timers[rcmd[1]] = timer } - timer.conns = c.appendTo(timer.conns) + if c != nil { + timer.conns = c.appendTo(timer.conns) + } // todo: when timer.expr changed send message to all the timer‘s subscribe if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) { @@ -55,8 +59,8 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { }() timer.configSave() } - if len(rcmd) == 4 && !strings.EqualFold("a", rcmd[3]) && !timer.single { - timer.single = true + if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.single) { + timer.single = strings.EqualFold("a", rcmd[3]) timer.configSave() } @@ -104,14 +108,18 @@ func (t *ZTimer) configSave() { log.Println(err) } - fmt.Println(buf.String()) + //fmt.Println(buf.String()) rest, err, s := executeShell(buf.String()) if err != nil { log.Println(err) } - fmt.Println("res:", rest) - fmt.Println("error-rest:", s) + if !strings.EqualFold(rest, "") { + fmt.Println("res:", rest) + } + if !strings.EqualFold(s, "") { + fmt.Println("error-rest:", s) + } } func executeShell(command string) (string, error, string) { @@ -123,3 +131,27 @@ func executeShell(command string) (string, error, string) { err := cmd.Run() return stdout.String(), err, stderr.String() } + +func (s *ZSub) reloadTimerConfig() { + db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(47.111.150.118:6063)/platf_oth?charset=utf8") // dev + //db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(0.0.0.0:6033)/platf_oth?charset=utf8") // qc + //db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(122.112.180.156:6033)/platf_oth?charset=utf8") // pro + if err != nil { + log.Println(err) + return + } + defer db.Close() + + rows, err := db.Query("SELECT t.`name`,t.`expr`,IF(t.`single`=1,'a','x') 'single' FROM tasktimer t WHERE t.`status`=10 ORDER BY t.`timerid`") + if err != nil { + log.Println(err) + } + + for rows.Next() { + var name string + var expr string + var single string + rows.Scan(&name, &expr, &single) + s.timer([]string{"timer", name, expr, single}, nil) //["timer", topic, expr, a|x] + } +}