diff --git a/cli/zdb-client.go b/cli/client.go similarity index 82% rename from cli/zdb-client.go rename to cli/client.go index 51f344e..26dddc7 100644 --- a/cli/zdb-client.go +++ b/cli/client.go @@ -12,21 +12,21 @@ import ( "time" ) -var ( - reconnect = 0 - subFun = make(map[string]func(v string)) - timerFun = make(map[string]func()) - chSend = make(chan []string, 1000) - chReceive = make(chan []string, 1000) - timerReceive = make(chan []string, 1000) -) - type Client struct { - wlock sync.Mutex // 写锁 - rlock sync.Mutex // 读锁 - addr string // host:port - conn net.Conn // socket 连接对象 - createTime time.Time // 创建时间 + wlock sync.Mutex // write lock + rlock sync.Mutex // read lock + + addr string // host:port + conn net.Conn // socket conn + createTime time.Time // client create time + groupid string // client group id + + subFun map[string]func(v string) // subscribe topic and callback function + timerFun map[string]func() // subscribe timer amd callback function + + chSend chan []string // chan of send message + chReceive chan []string // chan of receive message + timerReceive chan []string // chan of timer } func Create(addr string, groupid string) (*Client, error) { @@ -40,7 +40,14 @@ func Create(addr string, groupid string) (*Client, error) { rlock: sync.Mutex{}, addr: addr, conn: conn, + groupid: groupid, createTime: time.Now(), + + subFun: make(map[string]func(v string)), + timerFun: make(map[string]func()), + chSend: make(chan []string, 100), + chReceive: make(chan []string, 100), + timerReceive: make(chan []string, 100), } conn.Write([]byte("groupid " + groupid + "\r\n")) @@ -57,8 +64,11 @@ func (c *Client) reconn() (err error) { continue } else if err == nil { c.conn = conn + conn.Write([]byte("groupid " + c.groupid + "\r\n")) go c.receive() - for topic, _ := range subFun { + + // 重新订阅 + for topic, _ := range c.subFun { c.subscribes(topic) } break @@ -72,15 +82,15 @@ func (c *Client) init() { go func() { for { select { - case vs := <-chReceive: - fun := subFun[vs[1]] + case vs := <-c.chReceive: + fun := c.subFun[vs[1]] if fun == nil { log.Println("topic received, nothing to do", vs[1], vs[2]) continue } fun(vs[2]) - case vs := <-timerReceive: - fun := timerFun[vs[1]] + case vs := <-c.timerReceive: + fun := c.timerFun[vs[1]] if fun == nil { log.Println("timer received, nothing to do", vs[1]) continue @@ -95,7 +105,7 @@ func (c *Client) init() { } func (c *Client) Subscribe(topic string, fun func(v string)) { - subFun[topic] = fun + c.subFun[topic] = fun c.subscribes(topic) } @@ -110,7 +120,7 @@ func (c *Client) ping() { // -------------------------------------- pub-sub -------------------------------------- /* -发送 主题消息 +send topic message : --- *3 $7 @@ -132,12 +142,12 @@ func (c *Client) Daly(topic string, message string, daly int) error { } func (c *Client) Timer(topic string, expr string, fun func()) { - timerFun[topic] = fun + c.timerFun[topic] = fun c.send("timer", topic, expr) } /* -// 订阅主题消息 +// subscribe topic --- subscribe x y z --- @@ -155,7 +165,11 @@ func (c *Client) subscribes(topics ...string) error { return nil } -// 发送 socket 消息 +/* +send socket message : +if len(vs) equal 1 will send message `vs[0] + "\r\n"` +else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...` +*/ func (c *Client) send(vs ...string) (err error) { //chSend <- vs c.wlock.Lock() @@ -222,11 +236,11 @@ func (c *Client) receive() { } if len(vs) == 3 && strings.EqualFold(vs[0], "message") { - chReceive <- vs + c.chReceive <- vs continue } if len(vs) == 2 && strings.EqualFold(vs[0], "timer") { - timerReceive <- vs + c.timerReceive <- vs continue } @@ -256,9 +270,6 @@ func (c *Client) set(key string, value interface{}) error { return nil } -/* - - */ func (c *Client) get(key string) string { return "" @@ -268,6 +279,8 @@ func (c *Client) get(key string) string { // ============================================================================== +var reconnect = 0 + // client 命令行程序 func ClientRun(host string, port int) { conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) diff --git a/cli/zdb-client_test.go b/cli/zdb-client_test.go deleted file mode 100644 index fc89430..0000000 --- a/cli/zdb-client_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package cli - -import ( - "testing" -) - -func TestClient(t *testing.T) { - -} diff --git a/cli_test.go b/cli_test.go index 9188edb..a9b10ef 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,7 +2,6 @@ package main import ( "log" - "strconv" "testing" "time" "zhub/cli" @@ -10,8 +9,7 @@ import ( func TestCli(t *testing.T) { //client, err := cli.Create("39.108.56.246:1216", "") - client, err := cli.Create("127.0.0.1:1216", "") - + client, err := cli.Create("127.0.0.1:1216", "topic-x") if err != nil { log.Fatal(err) } @@ -21,14 +19,44 @@ func TestCli(t *testing.T) { log.Println("收到主题 a 消息 " + v) }) - // - client.Timer("t", "* * * * * *") + // 定时调度 + client.Timer("t------------------x", "*/3 * * * * *", func() { + log.Println("收到 t------------------x 定时消息") + }) - go func() { - for i := 0; i < 50000; i++ { + /*go func() { + for i := 0; i < 100000; i++ { client.Publish("a", strconv.Itoa(i)) time.Sleep(time.Second) } + }()*/ + + client.Subscribe("a", func(v string) { + log.Println("收到主题 a 消息 " + v) + }) + client.Daly("a", "x", 3000) + + time.Sleep(time.Hour * 3) +} + +func TestTimer(t *testing.T) { + go func() { + client, _ := cli.Create("127.0.0.1:1216", "topic-x") + client.Timer("t", "*/3 * * * * *", func() { + log.Println("=======收到 t 定时消息") + }) + + client.Timer("t------------------x", "*/3 * * * * *", func() { + log.Println("收到 t------------------x 定时消息") + }) + }() + + time.Sleep(time.Second * 5) + go func() { + client, _ := cli.Create("127.0.0.1:1216", "topic-x") + client.Timer("t", "*/5 * * * * *", func() { + log.Println("-------收到 t 定时消息") + }) }() time.Sleep(time.Hour * 3) diff --git a/pkg.bat b/pkg.bat index ebe4992..543aceb 100644 --- a/pkg.bat +++ b/pkg.bat @@ -1,4 +1,4 @@ SET GOOS=linux SET GOARCH=amd64 -go build -o zdb.sh -ldflags "-s -w" ./main.go -upx -9 zdb.sh +go build -o zhub.sh -ldflags "-s -w" ./main.go +upx -9 zhub.sh diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index a4b691e..51bf3a9 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -27,11 +27,21 @@ func msgAccept(v Message) { if len(rcmd) == 1 { switch strings.ToLower(rcmd[0]) { default: - // subscribe|unsubscribe|daly - if strings.Index(rcmd[0], "subscribe") == 0 || strings.Index(rcmd[0], "unsubscribe") == 0 || strings.Index(rcmd[0], "daly") == 0 { + // str start with strs anyone + var startWithAny = func(str string, strs ...string) bool { + for _, str := range strs { + if strings.Index(rcmd[0], str) == 0 { + return true + } + } + return false + } + + arr := []string{"subscribe", "unsubscribe", "daly", "groupid"} + if startWithAny(rcmd[0], arr...) { rcmd = strings.Split(rcmd[0], " ") } else { - send(c.conn, "-Error: not supported! (tips: send help)") + send(c.conn, "-Error: not supported:"+rcmd[0]) return } } @@ -39,8 +49,10 @@ func msgAccept(v Message) { cmd := rcmd[0] switch cmd { + case "groupid": + c.groupid = rcmd[1] case "subscribe": - //subscribe x y z + // subscribe x y z for _, topic := range rcmd[1:] { zsub.subscribe(c, topic) // todo: 批量一次订阅 } @@ -57,7 +69,7 @@ func msgAccept(v Message) { case "daly": daly(rcmd, c) case "timer": - // todo Timer(rcmd, conn) + zsub.timer(rcmd, c) default: send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") return @@ -86,7 +98,7 @@ func daly(rcmd []string, c *ZConn) { var wlock = sync.Mutex{} -// 发送消息 +// send message func send(conn *net.Conn, vs ...string) error { wlock.Lock() defer wlock.Unlock() diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 037e9d3..41deee3 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -2,32 +2,27 @@ package zsub import "sync" -type ZGroup struct { //ZGroup +type ZGroup struct { // ZGroup sync.Mutex conns []*ZConn offset int chMsg chan string // 组消息即时投递 + ztopic *ZTopic // 所属topic } -func createZGroup(c *ZConn) *ZGroup { - zgroup := &ZGroup{ - conns: []*ZConn{}, - chMsg: make(chan string, 100), - } - - // 开启消息推送 +func (g *ZGroup) init() { go func() { for { - msg, ok := <-zgroup.chMsg + msg, ok := <-g.chMsg if !ok { break } - for _, c := range zgroup.conns { - (*c.conn).Write([]byte(msg)) - zgroup.offset++ + if len(g.conns) == 0 { + continue } + send(g.conns[0].conn, "message", g.ztopic.topic, msg) + g.offset++ } }() - return zgroup } diff --git a/zsub/zsub.go b/zsub/zsub.go index cdac409..9171858 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -6,11 +6,15 @@ import ( "log" "net" "strconv" + "strings" "sync" ) var ( - zsub ZSub + zsub ZSub = ZSub{ + topics: make(map[string]*ZTopic), + timers: make(map[string]*ZTimer), + } ) type ZSub struct { @@ -32,16 +36,26 @@ type ZConn struct { //ZConn 2、加入到对应组别;如果是第一次的消费组 offset从当前 mcount 开始 3、若有待消费消息启动消费 */ -func (s ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} +func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} ztopic := s.topics[topic] //ZTopic if ztopic == nil { - ztopic = &ZTopic{groups: map[string]*ZGroup{}} + ztopic = &ZTopic{ + groups: map[string]*ZGroup{}, + topic: topic, + chMsg: make(chan string, 100), + } + ztopic.init() s.topics[topic] = ztopic } zgroup := ztopic.groups[c.groupid] //ZGroup if zgroup == nil { - zgroup = &ZGroup{conns: []*ZConn{}} + zgroup = &ZGroup{ + conns: []*ZConn{}, + ztopic: ztopic, + chMsg: make(chan string, 1000), + } + zgroup.init() ztopic.groups[c.groupid] = zgroup } @@ -54,12 +68,23 @@ func (s ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} } _conns = append(_conns, c) zgroup.conns = _conns + + // 这是 ZConn + _topics := c.topics + for _, _topic := range c.topics { + if strings.EqualFold(_topic, topic) { + continue + } + _topics = append(_topics, _topic) + } + _topics = append(_topics, topic) + c.topics = _topics } /* 取消订阅: */ -func (s ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} +func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -86,20 +111,18 @@ func (s ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} 2、回复消息写入成功 3、推送主题消息 */ -func (s ZSub) publish(topic string, message string) { +func (s *ZSub) publish(topic string, msg string) { s.Lock() defer s.Unlock() ztopic := s.topics[topic] //ZTopic if ztopic == nil { return } - - for _, zgroup := range ztopic.groups { - zgroup.chMsg <- message // 不同主题消费独立进行 - } + ztopic.chMsg <- msg + ztopic.mcount++ } -func (s ZSub) close(c *ZConn) { +func (s *ZSub) close(c *ZConn) { // 订阅 for _, topic := range c.topics { s.unsubscribe(c, topic) @@ -114,6 +137,7 @@ func (s ZSub) close(c *ZConn) { timer.close(c) } } + (*c.conn).Close() } // ================== ZHub 服务 ===================================== @@ -146,14 +170,18 @@ func ServerStart(host string, port int) { } fmt.Println("conn start: ", conn.RemoteAddr()) - go zsub.acceptHandler(&ZConn{conn: &conn}) + go zsub.acceptHandler(&ZConn{ + conn: &conn, + topics: []string{}, + timers: []string{}, + }) } } // 连接处理 -func (s ZSub) acceptHandler(c *ZConn) { +func (s *ZSub) acceptHandler(c *ZConn) { defer func() { - s.close(c) // 关闭连接 + s.close(c) // close ZConn }() reader := bufio.NewReader(*c.conn) diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 557d836..dea629e 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/robfig/cron" "strings" - "time" ) type ZTimer struct { @@ -14,7 +13,7 @@ type ZTimer struct { cron *cron.Cron } -func (s ZSub) timer(rcmd []string, c *ZConn) { +func (s *ZSub) timer(rcmd []string, c *ZConn) { timer := s.timers[rcmd[1]] if timer == nil { timer = &ZTimer{ @@ -29,7 +28,7 @@ func (s ZSub) timer(rcmd []string, c *ZConn) { if conn == c { continue } - _conns = append(_conns, c) + _conns = append(_conns, conn) } _conns = append(_conns, c) timer.conns = _conns @@ -42,7 +41,7 @@ func (s ZSub) timer(rcmd []string, c *ZConn) { timer.cron = func() *cron.Cron { c := cron.New() c.AddFunc(timer.expr, func() { - fmt.Println(time.Now().Second()) + //fmt.Println(time.Now().Second()) for _, conn := range timer.conns { send(conn.conn, "timer", timer.topic) } @@ -56,7 +55,11 @@ func (s ZSub) timer(rcmd []string, c *ZConn) { fmt.Println("xx") } -func (t ZTimer) close(c *ZConn) { - // todo timer zconn - +func (t *ZTimer) close(c *ZConn) { + for i, conn := range t.conns { + if conn.conn == c.conn { + t.conns = append(t.conns[:i], t.conns[i+1:]...) + } + } + t.conns = append(t.conns, c) } diff --git a/zsub/ztopic.go b/zsub/ztopic.go index 3043974..9add967 100644 --- a/zsub/ztopic.go +++ b/zsub/ztopic.go @@ -6,9 +6,24 @@ type ZTopic struct { //ZTopic sync.Mutex groups map[string]*ZGroup mcount int + topic string // 主题名称 chMsg chan string // 主题消息投递 } // 主题消息发送 +func (t *ZTopic) init() { + go func() { + for { + msg, ok := <-t.chMsg + if !ok { + break + } + + for _, group := range t.groups { + group.chMsg <- msg + } + } + }() +} //