diff --git a/cli/client.go b/cli/client.go index fc51d93..8aecc7d 100644 --- a/cli/client.go +++ b/cli/client.go @@ -146,7 +146,11 @@ func (c *Client) Daly(topic string, message string, daly int) error { func (c *Client) Timer(topic string, expr string, fun func()) { c.timerFun[topic] = fun - c.send("timer", topic, expr) + c.send("timer", topic, expr, "x") +} +func (c *Client) TimerSingle(topic string, expr string, fun func()) { + c.timerFun[topic] = fun + c.send("timer", topic, expr, "a") } // todo: save client timer‘s info diff --git a/cli_test.go b/cli_test.go index c74f7ad..ee73aa7 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,14 +2,14 @@ package main import ( "log" - "strconv" "testing" "time" "zhub/cli" ) func TestCli(t *testing.T) { - client, err := cli.Create("39.108.56.246:7070", "") + //client, err := cli.Create("39.108.56.246:7070", "") + client, err := cli.Create("47.111.150.118:6066", "") //client, err := cli.Create("127.0.0.1:1216", "topic-x") if err != nil { log.Fatal(err) @@ -21,16 +21,16 @@ func TestCli(t *testing.T) { }) // 定时调度 - client.Timer("t------------------x", "*/3 * * * * *", func() { + client.Timer("a", "*/5 * * * * *", func() { log.Println("收到 t------------------x 定时消息") }) - go func() { + /*go func() { for i := 0; i < 100000; i++ { client.Publish("a", strconv.Itoa(i)) time.Sleep(time.Second) } - }() + }()*/ client.Subscribe("a", func(v string) { log.Println("收到主题 a 消息 " + v) diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 05b8a95..3729e2c 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -1,20 +1,25 @@ package zsub import ( + "bytes" + "fmt" "github.com/robfig/cron" + "log" + "os/exec" "strings" + "text/template" ) type ZTimer struct { - conns []*ZConn - expr string - topic string - cron *cron.Cron + conns []*ZConn + expr string + topic string + cron *cron.Cron + single bool } /* -1、["timer", topic, expr] -2、["timer", topic] +["timer", topic, expr, a|x] */ func (s *ZSub) timer(rcmd []string, c *ZConn) { s.Lock() @@ -30,7 +35,7 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { timer.conns = c.appendTo(timer.conns) // todo: when timer.expr changed send message to all the timer‘s subscribe - if len(rcmd) == 3 && !strings.EqualFold(timer.expr, rcmd[2]) { + if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) { timer.expr = rcmd[2] if timer.cron != nil { timer.cron.Stop() @@ -38,14 +43,21 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { timer.cron = func() *cron.Cron { c := cron.New() c.AddFunc(timer.expr, func() { - //fmt.Println(time.Now().Second()) for _, conn := range timer.conns { - send(conn.conn, "timer", timer.topic) + err := send(conn.conn, "timer", timer.topic) + if timer.single && err == nil { + break + } } }) go c.Run() return c }() + timer.configSave() + } + if len(rcmd) == 4 && !strings.EqualFold("a", rcmd[3]) && !timer.single { + timer.single = true + timer.configSave() } s.timers[rcmd[1]] = timer @@ -58,3 +70,56 @@ func (t *ZTimer) close(c *ZConn) { } } } + +func (t *ZTimer) configSave() { + tpl, err := template.New("").Parse(` + if [ ! -d "/etc/zhub" ]; then + mkdir /etc/zhub + fi + if [ ! -f "/etc/zhub/ztimer.cron" ]; then + touch /etc/zhub/ztimer.cron + fi + + sed -i /^{{.Name}}\|*/d /etc/zhub/ztimer.cron + echo '{{.Name}}|{{.Expr}}|{{.Single}}' >> /etc/zhub/ztimer.cron + `) + + if err != nil { + log.Println(err) + } + + var buf bytes.Buffer + err = tpl.Execute(&buf, map[string]string{ + "Name": t.topic, + "Expr": t.expr, + "Single": func() string { + if t.single { + return "a" + } else { + return "x" + } + }(), + }) + if err != nil { + log.Println(err) + } + + fmt.Println(buf.String()) + + rest, err, s := executeShell(buf.String()) + if err != nil { + log.Println(err) + } + fmt.Println("res:", rest) + fmt.Println("error-rest:", s) +} + +func executeShell(command string) (string, error, string) { + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd := exec.Command("/bin/bash", "-c", command) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return stdout.String(), err, stderr.String() +}