From 320a94af2d423dfde24df4c6fc321b086a69beca Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Sun, 10 Jan 2021 18:02:12 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Azconn.appendTo=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E9=A3=8E=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@65 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli/client.go | 8 +++++++ cli_test.go | 9 ++++---- zsub/zsub.go | 59 +++++++++++++++++++++++--------------------------- zsub/ztimer.go | 27 ++++++++++------------- 4 files changed, 51 insertions(+), 52 deletions(-) diff --git a/cli/client.go b/cli/client.go index 26dddc7..fc51d93 100644 --- a/cli/client.go +++ b/cli/client.go @@ -71,6 +71,9 @@ func (c *Client) reconn() (err error) { for topic, _ := range c.subFun { c.subscribes(topic) } + for topic, _ := range c.timerFun { + c.timer(topic) + } break } } @@ -146,6 +149,11 @@ func (c *Client) Timer(topic string, expr string, fun func()) { c.send("timer", topic, expr) } +// todo: save client timer‘s info +func (c *Client) timer(topic string) { + c.send("timer", topic) +} + /* // subscribe topic --- diff --git a/cli_test.go b/cli_test.go index a9b10ef..c74f7ad 100644 --- a/cli_test.go +++ b/cli_test.go @@ -2,14 +2,15 @@ package main import ( "log" + "strconv" "testing" "time" "zhub/cli" ) func TestCli(t *testing.T) { - //client, err := cli.Create("39.108.56.246:1216", "") - client, err := cli.Create("127.0.0.1:1216", "topic-x") + client, err := cli.Create("39.108.56.246:7070", "") + //client, err := cli.Create("127.0.0.1:1216", "topic-x") if err != nil { log.Fatal(err) } @@ -24,12 +25,12 @@ func TestCli(t *testing.T) { log.Println("收到 t------------------x 定时消息") }) - /*go func() { + go func() { for i := 0; i < 100000; i++ { client.Publish("a", strconv.Itoa(i)) time.Sleep(time.Second) } - }()*/ + }() client.Subscribe("a", func(v string) { log.Println("收到主题 a 消息 " + v) diff --git a/zsub/zsub.go b/zsub/zsub.go index 9171858..ac73fb3 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -11,7 +11,7 @@ import ( ) var ( - zsub ZSub = ZSub{ + zsub = ZSub{ topics: make(map[string]*ZTopic), timers: make(map[string]*ZTimer), } @@ -59,32 +59,22 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} ztopic.groups[c.groupid] = zgroup } - _conns := make([]*ZConn, 0) - for _, conn := range zgroup.conns { - if conn == c { - continue - } - _conns = append(_conns, conn) - } - _conns = append(_conns, c) - zgroup.conns = _conns + zgroup.conns = c.appendTo(zgroup.conns) - // 这是 ZConn - _topics := c.topics - for _, _topic := range c.topics { - if strings.EqualFold(_topic, topic) { - continue + for i, item := range c.topics { + if strings.EqualFold(item, topic) { + c.topics = append(c.topics[:i], c.topics[:i+1]...) } - _topics = append(_topics, _topic) } - _topics = append(_topics, topic) - c.topics = _topics + c.topics = append(c.topics, topic) } /* 取消订阅: */ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} + s.Lock() + defer s.Unlock() ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -95,21 +85,17 @@ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} return } - _conns := make([]*ZConn, 0) - for _, conn := range zgroup.conns { - if conn == c { - continue + for i, item := range zgroup.conns { + if item == c { + zgroup.conns = append(zgroup.conns[:i], zgroup.conns[:i+1]...) } - _conns = append(_conns, c) } - zgroup.conns = _conns } /* -发送主题消息 -1、写入主题消息列表(_zdb) -2、回复消息写入成功 -3、推送主题消息 +accept topic message +1、send message to topic's chan +2、feedback send success to sender, and sending message to topic's subscripts */ func (s *ZSub) publish(topic string, msg string) { s.Lock() @@ -123,12 +109,12 @@ func (s *ZSub) publish(topic string, msg string) { } func (s *ZSub) close(c *ZConn) { - // 订阅 + // sub for _, topic := range c.topics { s.unsubscribe(c, topic) } - // 延时 + // daly // timer conn close for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 @@ -140,14 +126,23 @@ func (s *ZSub) close(c *ZConn) { (*c.conn).Close() } -// ================== ZHub 服务 ===================================== +func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { + for i, item := range arr { + if item == c { + arr = append(arr[:i], arr[:i+1]...) + } + } + return append(arr, c) +} + +// ================== ZHub server ===================================== func ServerStart(host string, port int) { listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { log.Fatal(err) return } - log.Printf("_zdb started listen on: %s:%d \n", host, port) + log.Printf("zhub started listen on: %s:%d \n", host, port) // 启动消息监听处理 go func() { diff --git a/zsub/ztimer.go b/zsub/ztimer.go index dea629e..05b8a95 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -1,7 +1,6 @@ package zsub import ( - "fmt" "github.com/robfig/cron" "strings" ) @@ -13,7 +12,13 @@ type ZTimer struct { cron *cron.Cron } +/* +1、["timer", topic, expr] +2、["timer", topic] +*/ func (s *ZSub) timer(rcmd []string, c *ZConn) { + s.Lock() + defer s.Unlock() timer := s.timers[rcmd[1]] if timer == nil { timer = &ZTimer{ @@ -22,18 +27,10 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { } s.timers[rcmd[1]] = timer } + timer.conns = c.appendTo(timer.conns) - _conns := make([]*ZConn, 0) - for _, conn := range timer.conns { - if conn == c { - continue - } - _conns = append(_conns, conn) - } - _conns = append(_conns, c) - timer.conns = _conns - - if !strings.EqualFold(timer.expr, rcmd[2]) { + // todo: when timer.expr changed send message to all the timer‘s subscribe + if len(rcmd) == 3 && !strings.EqualFold(timer.expr, rcmd[2]) { timer.expr = rcmd[2] if timer.cron != nil { timer.cron.Stop() @@ -52,14 +49,12 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { } s.timers[rcmd[1]] = timer - fmt.Println("xx") } func (t *ZTimer) close(c *ZConn) { - for i, conn := range t.conns { - if conn.conn == c.conn { + for i, item := range t.conns { + if item.conn == c.conn { t.conns = append(t.conns[:i], t.conns[i+1:]...) } } - t.conns = append(t.conns, c) }