diff --git a/cli/zdb-client.go b/cli/zdb-client.go index 0cac8d2..51f344e 100644 --- a/cli/zdb-client.go +++ b/cli/zdb-client.go @@ -14,9 +14,10 @@ import ( var ( reconnect = 0 - subEvent = make(map[string]func(v string)) - chReceive = make(chan []string, 1000) + subFun = make(map[string]func(v string)) + timerFun = make(map[string]func()) chSend = make(chan []string, 1000) + chReceive = make(chan []string, 1000) timerReceive = make(chan []string, 1000) ) @@ -57,7 +58,7 @@ func (c *Client) reconn() (err error) { } else if err == nil { c.conn = conn go c.receive() - for topic, _ := range subEvent { + for topic, _ := range subFun { c.subscribes(topic) } break @@ -72,38 +73,29 @@ func (c *Client) init() { for { select { case vs := <-chReceive: - fun := subEvent[vs[1]] + fun := subFun[vs[1]] if fun == nil { log.Println("topic received, nothing to do", vs[1], vs[2]) continue } fun(vs[2]) case vs := <-timerReceive: - log.Println("收到 timer 消息 ", vs[1]) + fun := timerFun[vs[1]] + if fun == nil { + log.Println("timer received, nothing to do", vs[1]) + continue + } + fun() } } - - /*for { - vs, ok := <-chReceive - if !ok { - break - } - - fun := subEvent[vs[1]] - if fun == nil { - log.Println("topic received, nothing to do", vs[1], vs[2]) - continue - } - fun(vs[2]) - }*/ }() go c.receive() } func (c *Client) Subscribe(topic string, fun func(v string)) { - subEvent[topic] = fun + subFun[topic] = fun c.subscribes(topic) } @@ -139,7 +131,8 @@ func (c *Client) Daly(topic string, message string, daly int) error { return nil } -func (c *Client) Timer(topic string, expr string) { +func (c *Client) Timer(topic string, expr string, fun func()) { + timerFun[topic] = fun c.send("timer", topic, expr) } diff --git a/cli/zdb-client_test.go b/cli/zdb-client_test.go index aea9340..fc89430 100644 --- a/cli/zdb-client_test.go +++ b/cli/zdb-client_test.go @@ -2,11 +2,8 @@ package cli import ( "testing" - ) func TestClient(t *testing.T) { - - } diff --git a/x_test.go b/cli_test.go similarity index 66% rename from x_test.go rename to cli_test.go index 839a042..9188edb 100644 --- a/x_test.go +++ b/cli_test.go @@ -8,22 +8,25 @@ import ( "zhub/cli" ) -func TestName(t *testing.T) { +func TestCli(t *testing.T) { //client, err := cli.Create("39.108.56.246:1216", "") client, err := cli.Create("127.0.0.1:1216", "") + if err != nil { log.Fatal(err) } - client.Subscribe("a-1", func(v string) { - log.Println(v) + // 订阅主题 消息 + client.Subscribe("a", func(v string) { + log.Println("收到主题 a 消息 " + v) }) + // client.Timer("t", "* * * * * *") go func() { for i := 0; i < 50000; i++ { - client.Publish("a-1", strconv.Itoa(i)) + client.Publish("a", strconv.Itoa(i)) time.Sleep(time.Second) } }() diff --git a/main.go b/main.go index 59f2222..efa63c3 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" "zhub/cli" - "zhub/zdb" + "zhub/zsub" ) func main() { @@ -30,7 +30,7 @@ func main() { } if server { - zdb.ServerStart(host, port) + zsub.ServerStart(host, port) } else { cli.ClientRun(host, port) } diff --git a/zdb/rcmd-exec.go b/zdb/rcmd-exec.go index 19c70ae..8e1c01d 100644 --- a/zdb/rcmd-exec.go +++ b/zdb/rcmd-exec.go @@ -1,4 +1,4 @@ -package zdb +package _zdb import ( "log" @@ -9,10 +9,10 @@ import ( "time" ) -func execCmd(rcmd []string, conn net.Conn) { +func ExecCmd(rcmd []string, conn net.Conn) { defer func() { if r := recover(); r != nil { - log.Println("execCmd Recovered:", r) + log.Println("ExecCmd Recovered:", r) } }() if len(rcmd) == 0 { @@ -58,7 +58,7 @@ func execCmd(rcmd []string, conn net.Conn) { case "daly": daly(rcmd, conn) case "timer": - Timer(rcmd, conn) + timer(rcmd, conn) default: conn.Write([]byte("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]\r\n")) return diff --git a/zdb/zdb-server.go b/zdb/zdb-server.go index 0bce4b6..d80357d 100644 --- a/zdb/zdb-server.go +++ b/zdb/zdb-server.go @@ -1,4 +1,4 @@ -package zdb +package _zdb import ( "bufio" @@ -16,7 +16,7 @@ var ( zsub = make(map[string][]*ConnContext) // topic -- connx[] retOk = []byte("+OK") retHelp = []byte( - "\n--- zdb help ---\n" + + "\n--- _zdb help ---\n" + "______ _____ _____ \n|___ / | _ \\ | _ \\ \n / / | | | | | |_| | \n / / | | | | | _ { \n / /__ | |_| | | |_| | \n/_____| |_____/ |_____/ \n" + "had supported command:\n" + "1. set:\n" + @@ -33,7 +33,7 @@ var ( " eg: incr a\n" + "7. decr:\n" + " eg: decr a\n" + - "--- zdb help ---\n") + "--- _zdb help ---\n") ) // 数据封装 @@ -56,7 +56,7 @@ func ServerStart(host string, port int) { log.Fatal(err) return } - log.Printf("zdb started listen on: %s:%d \n", host, port) + log.Printf("_zdb started listen on: %s:%d \n", host, port) // 启动消息监听处理 go func() { @@ -66,7 +66,7 @@ func ServerStart(host string, port int) { break } - execCmd(v.Rcmd, *&*v.Conn) + ExecCmd(v.Rcmd, *&*v.Conn) } }() @@ -108,7 +108,6 @@ func connHandler(conn net.Conn) { for { rcmd := make([]string, 0) line, _, err := reader.ReadLine() - // fmt.Println("line:", string(line)) todo 可使用第一行用于协议头 if err != nil { log.Println(err) return diff --git a/zdb/zdb_test.go b/zdb/zdb_test.go index 7681dc3..c05fdd3 100644 --- a/zdb/zdb_test.go +++ b/zdb/zdb_test.go @@ -1,7 +1,7 @@ -package zdb +package _zdb import "testing" -func TestName(t *testing.T) { +func TestService(t *testing.T) { ServerStart("127.0.0.1", 1216) } diff --git a/zdb/ztimer.go b/zdb/ztimer.go new file mode 100644 index 0000000..6c31d93 --- /dev/null +++ b/zdb/ztimer.go @@ -0,0 +1,62 @@ +package _zdb + +import ( + "fmt" + "github.com/robfig/cron" + "net" + "strings" + "time" +) + +var ( + zTimer = make(map[string]*ZTimer) +) + +type ZTimer struct { + conns []*net.Conn + expr string + topic string + cron *cron.Cron +} + +func timer(rcmd []string, conn net.Conn) { + ztimer := zTimer[rcmd[1]] + if ztimer == nil { + ztimer = &ZTimer{ + conns: []*net.Conn{}, + topic: rcmd[1], + } + zTimer[rcmd[1]] = ztimer + } + + _conns := make([]*net.Conn, 0) + for _, c := range ztimer.conns { + if *&conn == *c { + continue + } + _conns = append(_conns, c) + } + _conns = append(_conns, &conn) + ztimer.conns = _conns + + if !strings.EqualFold(ztimer.expr, rcmd[2]) { + ztimer.expr = rcmd[2] + if ztimer.cron != nil { + ztimer.cron.Stop() + } + ztimer.cron = func() *cron.Cron { + c := cron.New() + c.AddFunc(ztimer.expr, func() { + fmt.Println(time.Now().Second()) + for _, conn := range ztimer.conns { + Send(*conn, "timer", ztimer.topic) + } + }) + go c.Run() + return c + }() + } + + zTimer[ztimer.topic] = ztimer + fmt.Println("xx") +} diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go new file mode 100644 index 0000000..a4b691e --- /dev/null +++ b/zsub/msg-consumer.go @@ -0,0 +1,108 @@ +package zsub + +import ( + "log" + "net" + "strconv" + "strings" + "sync" + "time" +) + +func msgAccept(v Message) { + defer func() { + if r := recover(); r != nil { + log.Println("ExecCmd Recovered:", r) + } + }() + c := v.Conn + rcmd := v.Rcmd + + if len(rcmd) == 0 { + return + } + + log.Println("rcmd: " + strings.Join(rcmd, " ")) + + if len(rcmd) == 1 { + switch strings.ToLower(rcmd[0]) { + default: + // subscribe|unsubscribe|daly + if strings.Index(rcmd[0], "subscribe") == 0 || strings.Index(rcmd[0], "unsubscribe") == 0 || strings.Index(rcmd[0], "daly") == 0 { + rcmd = strings.Split(rcmd[0], " ") + } else { + send(c.conn, "-Error: not supported! (tips: send help)") + return + } + } + } + + cmd := rcmd[0] + switch cmd { + case "subscribe": + //subscribe x y z + for _, topic := range rcmd[1:] { + zsub.subscribe(c, topic) // todo: 批量一次订阅 + } + case "unsubscribe": + for _, topic := range rcmd[1:] { + zsub.unsubscribe(c, topic) + } + case "publish": + if len(rcmd) != 3 { + send(c.conn, "-Error: publish para number!") + } else { + zsub.publish(rcmd[1], rcmd[2]) + } + case "daly": + daly(rcmd, c) + case "timer": + // todo Timer(rcmd, conn) + default: + send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") + return + } +} + +// daly topic valye 100 +func daly(rcmd []string, c *ZConn) { + if len(rcmd) != 4 { + send(c.conn, "-Error: subscribe para number!") + return + } + + t, err := strconv.ParseInt(rcmd[3], 10, 64) + if err != nil { + send(c.conn, "-Error: "+strings.Join(rcmd, " ")) + return + } + + timer := time.NewTimer(time.Duration(t) * time.Millisecond) + select { + case <-timer.C: + zsub.publish(rcmd[1], rcmd[2]) + } +} + +var wlock = sync.Mutex{} + +// 发送消息 +func send(conn *net.Conn, vs ...string) error { + wlock.Lock() + defer wlock.Unlock() + + var bytes []byte + + if len(vs) == 1 { + bytes = []byte(vs[0] + "\r\n") + } else if len(vs) > 1 { + data := "*" + strconv.Itoa(len(vs)) + "\r\n" + for _, v := range vs { + data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += v + "\r\n" + } + bytes = []byte(data) + } + _, err := (*conn).Write(bytes) + return err +} diff --git a/zsub/zdb.go b/zsub/zdb.go new file mode 100644 index 0000000..19d8b6f --- /dev/null +++ b/zsub/zdb.go @@ -0,0 +1,11 @@ +package zsub + +var ( + chanMessages = make(chan Message, 1000) //接收到的 所有消息数据 +) + +// 数据封装 +type Message struct { + Conn *ZConn + Rcmd []string +} diff --git a/zsub/zsub.go b/zsub/zsub.go index e8089d6..cdac409 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -1,7 +1,11 @@ package zsub import ( + "bufio" + "fmt" + "log" "net" + "strconv" "sync" ) @@ -12,13 +16,14 @@ var ( type ZSub struct { sync.Mutex topics map[string]*ZTopic + timers map[string]*ZTimer } - type ZConn struct { //ZConn conn *net.Conn groupid string topics []string + timers []string // 订阅、定时调度分别创建各自连接 } /* @@ -77,7 +82,7 @@ func (s ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} /* 发送主题消息 -1、写入主题消息列表(zdb) +1、写入主题消息列表(_zdb) 2、回复消息写入成功 3、推送主题消息 */ @@ -90,6 +95,95 @@ func (s ZSub) publish(topic string, message string) { } for _, zgroup := range ztopic.groups { - zgroup.chMsg <- message + zgroup.chMsg <- message // 不同主题消费独立进行 + } +} + +func (s ZSub) close(c *ZConn) { + // 订阅 + for _, topic := range c.topics { + s.unsubscribe(c, topic) + } + + // 延时 + + // timer conn close + for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 + timer := s.timers[topic] + if timer != nil { + timer.close(c) + } + } +} + +// ================== ZHub 服务 ===================================== +func ServerStart(host string, port int) { + listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + log.Fatal(err) + return + } + log.Printf("_zdb started listen on: %s:%d \n", host, port) + + // 启动消息监听处理 + go func() { + for { + v, ok := <-chanMessages + if !ok { + break + } + + // 事件消费 + msgAccept(v) + } + }() + + for { + conn, err := listen.Accept() + if err != nil { + log.Println(err) + continue + } + fmt.Println("conn start: ", conn.RemoteAddr()) + + go zsub.acceptHandler(&ZConn{conn: &conn}) + } +} + +// 连接处理 +func (s ZSub) acceptHandler(c *ZConn) { + defer func() { + s.close(c) // 关闭连接 + }() + + reader := bufio.NewReader(*c.conn) + for { + rcmd := make([]string, 0) + line, _, err := reader.ReadLine() + if err != nil { + log.Println(err) + return + } + if len(line) == 0 { + continue + } + switch string(line[:1]) { + case "*": + n, _ := strconv.Atoi(string(line[1:])) + for i := 0; i < n; i++ { + reader.ReadLine() + v, _, _ := reader.ReadLine() + rcmd = append(rcmd, string(v)) + } + default: + rcmd = append(rcmd, string(line)) + } + + if len(rcmd) == 0 { + continue + } + + // 接收消息 zdb fixme: 细节暴露太多 + chanMessages <- Message{Conn: c, Rcmd: rcmd} } } diff --git a/zsub/zsub_test.go b/zsub/zsub_test.go index bcdad18..bb040ff 100644 --- a/zsub/zsub_test.go +++ b/zsub/zsub_test.go @@ -8,6 +8,7 @@ import ( func TestName(t *testing.T) { sub := ZSub{ topics: map[string]*ZTopic{}, + timers: map[string]*ZTimer{}, } sub.subscribe(&ZConn{ diff --git a/zsub/ztimer.go b/zsub/ztimer.go new file mode 100644 index 0000000..557d836 --- /dev/null +++ b/zsub/ztimer.go @@ -0,0 +1,62 @@ +package zsub + +import ( + "fmt" + "github.com/robfig/cron" + "strings" + "time" +) + +type ZTimer struct { + conns []*ZConn + expr string + topic string + cron *cron.Cron +} + +func (s ZSub) timer(rcmd []string, c *ZConn) { + timer := s.timers[rcmd[1]] + if timer == nil { + timer = &ZTimer{ + conns: []*ZConn{}, + topic: rcmd[1], + } + s.timers[rcmd[1]] = timer + } + + _conns := make([]*ZConn, 0) + for _, conn := range timer.conns { + if conn == c { + continue + } + _conns = append(_conns, c) + } + _conns = append(_conns, c) + timer.conns = _conns + + if !strings.EqualFold(timer.expr, rcmd[2]) { + timer.expr = rcmd[2] + if timer.cron != nil { + timer.cron.Stop() + } + timer.cron = func() *cron.Cron { + c := cron.New() + c.AddFunc(timer.expr, func() { + fmt.Println(time.Now().Second()) + for _, conn := range timer.conns { + send(conn.conn, "timer", timer.topic) + } + }) + go c.Run() + return c + }() + } + + s.timers[rcmd[1]] = timer + fmt.Println("xx") +} + +func (t ZTimer) close(c *ZConn) { + // todo timer zconn + +} diff --git a/zsub/ztopic.go b/zsub/ztopic.go index a1e6aa0..3043974 100644 --- a/zsub/ztopic.go +++ b/zsub/ztopic.go @@ -9,6 +9,6 @@ type ZTopic struct { //ZTopic chMsg chan string // 主题消息投递 } -func createZTopic() { +// 主题消息发送 -} +// diff --git a/ztimer/ztimer.go b/ztimer/ztimer.go deleted file mode 100644 index 8ab46b2..0000000 --- a/ztimer/ztimer.go +++ /dev/null @@ -1,3 +0,0 @@ -package ztimer - -