From 82155ca56ebe66f7cab39ca9f53ab65fb3e46b1d Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 18 Jan 2021 08:31:08 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E8=AE=BE=E7=BD=AE=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@73 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- app.go | 37 ++++++++ cli/client.go | 21 ++--- cli_test.go | 17 +--- conf/config.go | 76 +++++++++++++++ main.go | 38 -------- zdb/rcmd-exec.go | 216 ------------------------------------------- zdb/zdb-server.go | 136 --------------------------- zdb/zdb_test.go | 7 -- zdb/ztimer.go | 62 ------------- zsub/msg-consumer.go | 14 +-- zsub/zsub.go | 21 +++-- zsub/ztimer.go | 12 ++- 12 files changed, 156 insertions(+), 501 deletions(-) create mode 100644 app.go create mode 100644 conf/config.go delete mode 100644 main.go delete mode 100644 zdb/rcmd-exec.go delete mode 100644 zdb/zdb-server.go delete mode 100644 zdb/zdb_test.go delete mode 100644 zdb/ztimer.go diff --git a/app.go b/app.go new file mode 100644 index 0000000..83c1604 --- /dev/null +++ b/app.go @@ -0,0 +1,37 @@ +package main + +import ( + "os" + "strings" + "zhub/cli" + "zhub/conf" + "zhub/zsub" +) + +func main() { + server := true + confPath := "app.conf" + addr := "" + + for _, arg := range os.Args[1:] { + if strings.EqualFold(arg, "cli") { + server = false + } else if strings.Index(arg, "-d=") == 0 { + addr = arg[3:] + } else if strings.Index(arg, "-c=") == 0 { + confPath = arg[3:] + } + } + + if server { + conf.Load(confPath) + if len(addr) == 0 { + addr = conf.GetStr("service.zhub.servers", "127.0.0.1:1216") + } + // 服务进程启动 + zsub.ServerStart(addr) + } else { + cli.ClientRun(addr) + } + +} diff --git a/cli/client.go b/cli/client.go index 7e841f8..d69d867 100644 --- a/cli/client.go +++ b/cli/client.go @@ -23,7 +23,6 @@ type Client struct { subFun map[string]func(v string) // subscribe topic and callback function timerFun map[string]func() // subscribe timer amd callback function - dalyFun map[string]func() chSend chan []string // chan of send message chReceive chan []string // chan of receive message @@ -46,7 +45,6 @@ func Create(addr string, groupid string) (*Client, error) { subFun: make(map[string]func(v string)), timerFun: make(map[string]func()), - dalyFun: make(map[string]func()), chSend: make(chan []string, 100), chReceive: make(chan []string, 100), timerReceive: make(chan []string, 100), @@ -141,9 +139,8 @@ func (c *Client) Publish(topic string, message string) error { return nil } -func (c *Client) Daly(topic string, daly int, fun func()) error { - c.send("daly", topic, strconv.Itoa(daly)) - c.dalyFun[topic] = fun +func (c *Client) Daly(topic string, message string, daly int) error { + c.send("daly", topic, message, strconv.Itoa(daly)) return nil } @@ -271,11 +268,11 @@ func (c *Client) receive() { c.timerReceive <- vs continue } - if len(vs) == 2 && strings.EqualFold(vs[0], "daly") { + /*if len(vs) == 2 && strings.EqualFold(vs[0], "daly") { c.dalyFun[vs[1]]() delete(c.dalyFun, vs[1]) continue - } + }*/ continue case "+": // +pong, +xxx @@ -315,25 +312,25 @@ func (c *Client) get(key string) string { var reconnect = 0 // client 命令行程序 -func ClientRun(host string, port int) { - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) +func ClientRun(addr string) { + conn, err := net.Dial("tcp", fmt.Sprintf("%s", addr)) for { if err != nil { log.Println(err) time.Sleep(time.Second * 3) - conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + conn, err = net.Dial("tcp", fmt.Sprintf("%s", addr)) continue } - fmt.Println(fmt.Sprintf("had connected server: %s:%d", host, port)) + fmt.Println(fmt.Sprintf("had connected server: %s", addr)) break } defer func() { if reconnect == 1 { conn.Close() - ClientRun(host, port) + ClientRun(addr) } }() diff --git a/cli_test.go b/cli_test.go index b5a06d7..08d4ffe 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,15 +2,15 @@ package main import ( "log" - "strconv" "testing" "time" "zhub/cli" ) var ( - addr = "47.111.150.118:6066" - //addr = "127.0.0.1:1216" + //addr = "47.111.150.118:6066" + addr = "127.0.0.1:1216" + //addr = "39.108.56.246:1216" ) func TestCli(t *testing.T) { @@ -31,13 +31,6 @@ func TestCli(t *testing.T) { log.Println("收到 a 定时消息") }) - go func() { - for i := 0; i < 100000; i++ { - client.Publish("ax", strconv.Itoa(i)) - time.Sleep(time.Second) - } - }() - client.Subscribe("a", func(v string) { log.Println("收到主题 a 消息 " + v) }) @@ -70,8 +63,8 @@ func TestTimer(t *testing.T) { client.Timer("b", func() { log.Println("client-2 收到 b 的定时消息") }) - client.Timer("STANDING-DOWNLOAD-GAME", func() { - log.Println("client-2 收到 STANDING-DOWNLOAD-GAME 的定时消息") + client.Timer("LOAD-LIVE-ROOM-UNBANNED", func() { + log.Println("client-2 收到 LOAD-LIVE-ROOM-UNBANNED 的定时消息") }) client.Timer("VIP-EXP-EXPIRE", func() { log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息") diff --git a/conf/config.go b/conf/config.go new file mode 100644 index 0000000..1da76f5 --- /dev/null +++ b/conf/config.go @@ -0,0 +1,76 @@ +package conf + +import ( + "bufio" + "io" + "log" + "os" + "strconv" + "strings" +) + +var ( + config = make(map[string]string) + LogDebug bool +) + +func Load(path string) { + f, err := os.Open(path) + if err != nil { + log.Panicln(err) + } + + reader := bufio.NewReader(f) + space := "" + for { + bytes, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } + line := string(bytes) + line = strings.Trim(line, " \r\n") + if len(line) == 0 { + continue + } + if strings.Contains(line, "#") { + line = line[0:strings.Index(line, "#")] + } + + switch { + case strings.EqualFold(line, ""): + case strings.Index(line, "[") == 0 && strings.Index(line, "]") > 0: + space = line[1:strings.Index(line, "]")] + space = strings.Trim(space, " ") + case strings.Index(line, "=") > 0: + arr := strings.Split(line, "=") + if len(arr) < 2 { + continue + } + + config[space+"."+strings.Trim(arr[0], " ")] = strings.Trim(arr[1], " ") + default: + continue + } + } + + LogDebug = strings.EqualFold(config["log.level"], "debug") +} + +func GetStr(key string, def string) string { + if len(config[key]) == 0 { + return def + } + return config[key] +} + +func GetInt(key string, def int) int { + if len(config[key]) == 0 { + return def + } + n, err := strconv.Atoi(config[key]) + if err != nil { + log.Println(err, "return def;") + return def + } + return n +} diff --git a/main.go b/main.go deleted file mode 100644 index efa63c3..0000000 --- a/main.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "fmt" - "os" - "strconv" - "strings" - "zhub/cli" - "zhub/zsub" -) - -func main() { - server := true - host := "127.0.0.1" - port := 1216 - - for _, arg := range os.Args[1:] { - if strings.EqualFold(arg, "cli") { - server = false - } else if strings.Index(arg, "-h=") == 0 { - host = arg[3:] - } else if strings.Index(arg, "-p=") == 0 { - p, err := strconv.Atoi(arg[3:]) - if err != nil { - fmt.Println("-Error para: -p=[number]") - os.Exit(0) - } - port = p - } - } - - if server { - zsub.ServerStart(host, port) - } else { - cli.ClientRun(host, port) - } - -} diff --git a/zdb/rcmd-exec.go b/zdb/rcmd-exec.go deleted file mode 100644 index 8e1c01d..0000000 --- a/zdb/rcmd-exec.go +++ /dev/null @@ -1,216 +0,0 @@ -package _zdb - -import ( - "log" - "net" - "strconv" - "strings" - "sync" - "time" -) - -func ExecCmd(rcmd []string, conn net.Conn) { - defer func() { - if r := recover(); r != nil { - log.Println("ExecCmd Recovered:", r) - } - }() - if len(rcmd) == 0 { - return - } - - log.Println("rcmd: " + strings.Join(rcmd, " ")) - - if len(rcmd) == 1 { - switch strings.ToLower(rcmd[0]) { - case "help": - conn.Write([]byte("help-start\r\n")) - conn.Write(retHelp) - conn.Write([]byte("help-end\r\n")) - return - default: - // subscribe|unsubscribe|daly - if strings.Index(rcmd[0], "subscribe") == 0 || strings.Index(rcmd[0], "unsubscribe") == 0 || strings.Index(rcmd[0], "daly") == 0 { - rcmd = strings.Split(rcmd[0], " ") - } else { - conn.Write([]byte("-Error: not supported! (tips: send help)\r\n")) - return - } - } - } - - cmd := rcmd[0] - switch cmd { - case "decr": - decr(rcmd, conn) - case "incr": - incr(rcmd, conn) - case "get": - get(rcmd, conn) - case "set": - set(rcmd, conn) - case "subscribe": - subscribe(rcmd, conn) - case "unsubscribe": - unsubscribe(rcmd, conn) - case "publish": - publish(rcmd, conn) - case "daly": - daly(rcmd, conn) - case "timer": - timer(rcmd, conn) - default: - conn.Write([]byte("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]\r\n")) - return - } -} - -// daly topic valye 100 -func daly(rcmd []string, conn net.Conn) { - if len(rcmd) != 4 { - conn.Write([]byte("-Error: subscribe para number!\r\n")) - return - } - - t, err := strconv.ParseInt(rcmd[3], 10, 64) - if err != nil { - conn.Write([]byte("-Error: " + strings.Join(rcmd, " ") + "\r\n")) - return - } - - timer := time.NewTimer(time.Duration(t) * time.Millisecond) - select { - case <-timer.C: - // daly => publish - publish(rcmd[0:3], conn) - } -} - -func decr(rcmd []string, conn net.Conn) { - k := rcmd[1] - v := zkv[k] - if strings.EqualFold(v, "") { - v = "0" - } - _v, err := strconv.Atoi(v) - if err != nil { - conn.Write([]byte("-Error: " + err.Error() + "\r\n")) - } - - v = strconv.Itoa(_v - 1) - zkv[k] = v - conn.Write([]byte(v + "\r\n")) -} - -func incr(rcmd []string, conn net.Conn) { - k := rcmd[1] - v := zkv[k] - if strings.EqualFold(v, "") { - v = "0" - } - _v, err := strconv.Atoi(v) - if err != nil { - conn.Write([]byte("- Error: " + err.Error() + "\r\n")) - } - - v = strconv.Itoa(_v + 1) - zkv[k] = v - conn.Write([]byte(v + "\r\n")) -} - -func get(rcmd []string, conn net.Conn) { - k := rcmd[1] - v := zkv[k] - conn.Write([]byte(v + "\r\n")) -} - -func set(rcmd []string, conn net.Conn) { - if len(rcmd) != 3 { - conn.Write([]byte("-Error: set para number!\r\n")) - return - } - zkv[rcmd[1]] = rcmd[2] - conn.Write([]byte("+OK\r\n")) -} - -func subscribe(rcmd []string, conn net.Conn) { - if len(rcmd) < 2 { - conn.Write([]byte("-Error: subscribe para number!\r\n")) - return - } - - for _, topic := range rcmd[1:] { - conns := zsub[topic] - if conns == nil { - conns = make([]*ConnContext, 0) - } - - zsub[topic] = append(conns, &ConnContext{conn: &conn}) - } -} -func unsubscribe(rcmd []string, conn net.Conn) { - if len(rcmd) < 2 { - conn.Write([]byte("-Error: unsubscribe para number!")) - return - } - - for _, topic := range rcmd[1:] { - conns := zsub[topic] - if conns == nil || len(conns) == 0 { - return - } - _conns := make([]*ConnContext, 0) - for _, c := range conns { - if *c.conn == *&conn { - continue - } - _conns = append(_conns, c) - } - zsub[topic] = _conns - } -} -func publish(rcmd []string, conn net.Conn) { - if len(rcmd) < 3 { - conn.Write([]byte("-Error: publish para number!\r\n")) - return - } - - topic := rcmd[1] - v := rcmd[2] - - subs := zsub[topic] - if subs == nil || len(subs) == 0 { - return - } - - msgs := []string{"message", topic, v} - for _, c := range subs { - Send(*c.conn, msgs...) - /*_conn.Write([]byte("*3\r\n")) - for _, msg := range msgs { - _conn.Write([]byte("$" + strconv.Itoa(len(msg)) + "\r\n")) - _conn.Write([]byte(msg + "\r\n")) - }*/ - } -} - -var wlock = sync.Mutex{} - -func Send(conn net.Conn, vs ...string) (err error) { - //chSend <- vs - wlock.Lock() - defer wlock.Unlock() - - if len(vs) == 1 { - _, err = conn.Write([]byte(vs[0] + "\r\n")) - } else if len(vs) > 1 { - data := "*" + strconv.Itoa(len(vs)) + "\r\n" - for _, v := range vs { - data += "$" + strconv.Itoa(len(v)) + "\r\n" - data += v + "\r\n" - } - _, err = conn.Write([]byte(data)) - } - - return err -} diff --git a/zdb/zdb-server.go b/zdb/zdb-server.go deleted file mode 100644 index d80357d..0000000 --- a/zdb/zdb-server.go +++ /dev/null @@ -1,136 +0,0 @@ -package _zdb - -import ( - "bufio" - "fmt" - "log" - "net" - "strconv" - "time" -) - -// 消息命令处理 chan -var ( - chmsg = make(chan Message, 10000) - zkv = make(map[string]string) - zsub = make(map[string][]*ConnContext) // topic -- connx[] - retOk = []byte("+OK") - retHelp = []byte( - "\n--- _zdb help ---\n" + - "______ _____ _____ \n|___ / | _ \\ | _ \\ \n / / | | | | | |_| | \n / / | | | | | _ { \n / /__ | |_| | | |_| | \n/_____| |_____/ |_____/ \n" + - "had supported command:\n" + - "1. set:\n" + - " eg: set a 1\n" + - "2. get:\n" + - " eg: get a\n" + - "3. subscribe:\n" + - " eg: subscribe x y z\n" + - "4. unsubscribe:\n" + - " eg: unsubscribe x1 y1 z1\n" + - "5. publish:\n" + - " eg: publish x 123\n" + - "6. incr:\n" + - " eg: incr a\n" + - "7. decr:\n" + - " eg: decr a\n" + - "--- _zdb help ---\n") -) - -// 数据封装 -type Message struct { - Conn *net.Conn - Rcmd []string -} -type ConnContext struct { - conn *net.Conn - groupId string - createTime time.Time -} - -// ====================================================================== - -// ZHub 服务启动 -func ServerStart(host string, port int) { - listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) - if err != nil { - log.Fatal(err) - return - } - log.Printf("_zdb started listen on: %s:%d \n", host, port) - - // 启动消息监听处理 - go func() { - for { - v, ok := <-chmsg - if !ok { - break - } - - ExecCmd(v.Rcmd, *&*v.Conn) - } - }() - - for { - conn, err := listen.Accept() - if err != nil { - log.Println(err) - continue - } - fmt.Println("conn start: ", conn.RemoteAddr()) - - go connHandler(conn) - } -} - -// 连接处理 -func connHandler(conn net.Conn) { - defer func() { - for topic, connx := range zsub { - _conns := make([]*ConnContext, 0) - for t := range connx { - if *connx[t].conn == *&conn { - continue - } - _conns = append(_conns, connx[t]) - } - zsub[topic] = _conns - } - conn.Close() - if r := recover(); r != nil { - log.Println("connHandler Recovered:", r) - } - - fmt.Println("conn end: ", conn.RemoteAddr()) - }() - - reader := bufio.NewReader(conn) - - for { - rcmd := make([]string, 0) - line, _, err := reader.ReadLine() - if err != nil { - log.Println(err) - return - } - if len(line) == 0 { - continue - } - switch string(line[:1]) { - case "*": - n, _ := strconv.Atoi(string(line[1:])) - for i := 0; i < n; i++ { - reader.ReadLine() - v, _, _ := reader.ReadLine() - rcmd = append(rcmd, string(v)) - } - default: - rcmd = append(rcmd, string(line)) - } - - if len(rcmd) == 0 { - continue - } - - chmsg <- Message{Conn: &conn, Rcmd: rcmd} - } -} diff --git a/zdb/zdb_test.go b/zdb/zdb_test.go deleted file mode 100644 index c05fdd3..0000000 --- a/zdb/zdb_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package _zdb - -import "testing" - -func TestService(t *testing.T) { - ServerStart("127.0.0.1", 1216) -} diff --git a/zdb/ztimer.go b/zdb/ztimer.go deleted file mode 100644 index 6c31d93..0000000 --- a/zdb/ztimer.go +++ /dev/null @@ -1,62 +0,0 @@ -package _zdb - -import ( - "fmt" - "github.com/robfig/cron" - "net" - "strings" - "time" -) - -var ( - zTimer = make(map[string]*ZTimer) -) - -type ZTimer struct { - conns []*net.Conn - expr string - topic string - cron *cron.Cron -} - -func timer(rcmd []string, conn net.Conn) { - ztimer := zTimer[rcmd[1]] - if ztimer == nil { - ztimer = &ZTimer{ - conns: []*net.Conn{}, - topic: rcmd[1], - } - zTimer[rcmd[1]] = ztimer - } - - _conns := make([]*net.Conn, 0) - for _, c := range ztimer.conns { - if *&conn == *c { - continue - } - _conns = append(_conns, c) - } - _conns = append(_conns, &conn) - ztimer.conns = _conns - - if !strings.EqualFold(ztimer.expr, rcmd[2]) { - ztimer.expr = rcmd[2] - if ztimer.cron != nil { - ztimer.cron.Stop() - } - ztimer.cron = func() *cron.Cron { - c := cron.New() - c.AddFunc(ztimer.expr, func() { - fmt.Println(time.Now().Second()) - for _, conn := range ztimer.conns { - Send(*conn, "timer", ztimer.topic) - } - }) - go c.Run() - return c - }() - } - - zTimer[ztimer.topic] = ztimer - fmt.Println("xx") -} diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 5d3542d..b053a69 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -7,6 +7,7 @@ import ( "strings" "sync" "time" + "zhub/conf" ) func msgAccept(v Message) { @@ -22,7 +23,9 @@ func msgAccept(v Message) { return } - log.Println("rcmd: " + strings.Join(rcmd, " ")) + if conf.LogDebug { + log.Println("rcmd: " + strings.Join(rcmd, " ")) + } if len(rcmd) == 1 { switch strings.ToLower(rcmd[0]) { @@ -84,14 +87,14 @@ func msgAccept(v Message) { } } -// daly topic 100 +// daly topic value 100 -> publish topic value func daly(rcmd []string, c *ZConn) { - if len(rcmd) != 3 { + if len(rcmd) != 4 { send(c.conn, "-Error: subscribe para number!") return } - t, err := strconv.ParseInt(rcmd[2], 10, 64) + t, err := strconv.ParseInt(rcmd[3], 10, 64) if err != nil { send(c.conn, "-Error: "+strings.Join(rcmd, " ")) return @@ -100,8 +103,7 @@ func daly(rcmd []string, c *ZConn) { timer := time.NewTimer(time.Duration(t) * time.Millisecond) select { case <-timer.C: - send(c.conn, "daly", rcmd[1]) - // zsub.publish(rcmd[1], rcmd[2]) + zsub.publish(rcmd[1], rcmd[2]) } } diff --git a/zsub/zsub.go b/zsub/zsub.go index edf0c67..17697ec 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -2,7 +2,6 @@ package zsub import ( "bufio" - "fmt" "log" "net" "strconv" @@ -17,10 +16,6 @@ var ( } ) -func init() { - zsub.reloadTimerConfig() -} - type ZSub struct { sync.Mutex topics map[string]*ZTopic @@ -150,13 +145,21 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { } // ================== ZHub server ===================================== -func ServerStart(host string, port int) { - listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) +/* +1、初始化服务 +2、启动服务监听 +*/ +func ServerStart(addr string) { + + // 加载定时调度服务 + zsub.reloadTimerConfig() + + // 启动服务监听 + listen, err := net.Listen("tcp", addr) if err != nil { log.Fatal(err) - return } - log.Printf("zhub started listen on: %s:%d \n", host, port) + log.Printf("zhub started listen on: %s \n", addr) // 启动消息监听处理 go func() { diff --git a/zsub/ztimer.go b/zsub/ztimer.go index a3559dd..bfcf7b7 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -13,6 +13,7 @@ import ( "strings" "text/template" "time" + "zhub/conf" ) type ZTimer struct { @@ -165,9 +166,13 @@ func executeShell(command string) (string, error, string) { } func (s *ZSub) reloadTimerConfig() { - db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(47.111.150.118:6063)/platf_oth?charset=utf8") // dev - //db, err := sql.Open("mysql", "root:*Zhong123098!@tcp(121.196.17.55:6063)/platf_oth?charset=utf8") // qc - //db, err := sql.Open("mysql", "root:*Hello@27.com!@tcp(122.112.180.156:6033)/platf_oth?charset=utf8") // pro + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", + conf.GetStr("ztimer.db.user", "root"), + conf.GetStr("ztimer.db.pwd", "123456"), + conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"), + conf.GetStr("ztimer.db.database", "zhub"), + )) + if err != nil { log.Println(err) return @@ -177,6 +182,7 @@ func (s *ZSub) reloadTimerConfig() { rows, err := db.Query("SELECT t.`name`,t.`expr`,IF(t.`single`=1,'a','x') 'single' FROM tasktimer t WHERE t.`status`=10 ORDER BY t.`timerid`") if err != nil { log.Println(err) + return } for rows.Next() {