修改:ztiemr 服务实现,增加调度单/多推支持,调度配置记录配置文件
git-svn-id: svn://47.119.165.148/zhub@66 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
@@ -146,7 +146,11 @@ func (c *Client) Daly(topic string, message string, daly int) error {
|
||||
|
||||
func (c *Client) Timer(topic string, expr string, fun func()) {
|
||||
c.timerFun[topic] = fun
|
||||
c.send("timer", topic, expr)
|
||||
c.send("timer", topic, expr, "x")
|
||||
}
|
||||
func (c *Client) TimerSingle(topic string, expr string, fun func()) {
|
||||
c.timerFun[topic] = fun
|
||||
c.send("timer", topic, expr, "a")
|
||||
}
|
||||
|
||||
// todo: save client timer‘s info
|
||||
|
||||
10
cli_test.go
10
cli_test.go
@@ -2,14 +2,14 @@ package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
"zhub/cli"
|
||||
)
|
||||
|
||||
func TestCli(t *testing.T) {
|
||||
client, err := cli.Create("39.108.56.246:7070", "")
|
||||
//client, err := cli.Create("39.108.56.246:7070", "")
|
||||
client, err := cli.Create("47.111.150.118:6066", "")
|
||||
//client, err := cli.Create("127.0.0.1:1216", "topic-x")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -21,16 +21,16 @@ func TestCli(t *testing.T) {
|
||||
})
|
||||
|
||||
// 定时调度
|
||||
client.Timer("t------------------x", "*/3 * * * * *", func() {
|
||||
client.Timer("a", "*/5 * * * * *", func() {
|
||||
log.Println("收到 t------------------x 定时消息")
|
||||
})
|
||||
|
||||
go func() {
|
||||
/*go func() {
|
||||
for i := 0; i < 100000; i++ {
|
||||
client.Publish("a", strconv.Itoa(i))
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}()
|
||||
}()*/
|
||||
|
||||
client.Subscribe("a", func(v string) {
|
||||
log.Println("收到主题 a 消息 " + v)
|
||||
|
||||
@@ -1,20 +1,25 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/robfig/cron"
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"text/template"
|
||||
)
|
||||
|
||||
type ZTimer struct {
|
||||
conns []*ZConn
|
||||
expr string
|
||||
topic string
|
||||
cron *cron.Cron
|
||||
conns []*ZConn
|
||||
expr string
|
||||
topic string
|
||||
cron *cron.Cron
|
||||
single bool
|
||||
}
|
||||
|
||||
/*
|
||||
1、["timer", topic, expr]
|
||||
2、["timer", topic]
|
||||
["timer", topic, expr, a|x]
|
||||
*/
|
||||
func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
||||
s.Lock()
|
||||
@@ -30,7 +35,7 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
||||
timer.conns = c.appendTo(timer.conns)
|
||||
|
||||
// todo: when timer.expr changed send message to all the timer‘s subscribe
|
||||
if len(rcmd) == 3 && !strings.EqualFold(timer.expr, rcmd[2]) {
|
||||
if len(rcmd) == 4 && !strings.EqualFold(timer.expr, rcmd[2]) {
|
||||
timer.expr = rcmd[2]
|
||||
if timer.cron != nil {
|
||||
timer.cron.Stop()
|
||||
@@ -38,14 +43,21 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
||||
timer.cron = func() *cron.Cron {
|
||||
c := cron.New()
|
||||
c.AddFunc(timer.expr, func() {
|
||||
//fmt.Println(time.Now().Second())
|
||||
for _, conn := range timer.conns {
|
||||
send(conn.conn, "timer", timer.topic)
|
||||
err := send(conn.conn, "timer", timer.topic)
|
||||
if timer.single && err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
go c.Run()
|
||||
return c
|
||||
}()
|
||||
timer.configSave()
|
||||
}
|
||||
if len(rcmd) == 4 && !strings.EqualFold("a", rcmd[3]) && !timer.single {
|
||||
timer.single = true
|
||||
timer.configSave()
|
||||
}
|
||||
|
||||
s.timers[rcmd[1]] = timer
|
||||
@@ -58,3 +70,56 @@ func (t *ZTimer) close(c *ZConn) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *ZTimer) configSave() {
|
||||
tpl, err := template.New("").Parse(`
|
||||
if [ ! -d "/etc/zhub" ]; then
|
||||
mkdir /etc/zhub
|
||||
fi
|
||||
if [ ! -f "/etc/zhub/ztimer.cron" ]; then
|
||||
touch /etc/zhub/ztimer.cron
|
||||
fi
|
||||
|
||||
sed -i /^{{.Name}}\|*/d /etc/zhub/ztimer.cron
|
||||
echo '{{.Name}}|{{.Expr}}|{{.Single}}' >> /etc/zhub/ztimer.cron
|
||||
`)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = tpl.Execute(&buf, map[string]string{
|
||||
"Name": t.topic,
|
||||
"Expr": t.expr,
|
||||
"Single": func() string {
|
||||
if t.single {
|
||||
return "a"
|
||||
} else {
|
||||
return "x"
|
||||
}
|
||||
}(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
fmt.Println(buf.String())
|
||||
|
||||
rest, err, s := executeShell(buf.String())
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
fmt.Println("res:", rest)
|
||||
fmt.Println("error-rest:", s)
|
||||
}
|
||||
|
||||
func executeShell(command string) (string, error, string) {
|
||||
var stdout bytes.Buffer
|
||||
var stderr bytes.Buffer
|
||||
cmd := exec.Command("/bin/bash", "-c", command)
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
err := cmd.Run()
|
||||
return stdout.String(), err, stderr.String()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user