diff --git a/cli/client.go b/cli/client.go index a49e44e..7e841f8 100644 --- a/cli/client.go +++ b/cli/client.go @@ -23,6 +23,7 @@ type Client struct { subFun map[string]func(v string) // subscribe topic and callback function timerFun map[string]func() // subscribe timer amd callback function + dalyFun map[string]func() chSend chan []string // chan of send message chReceive chan []string // chan of receive message @@ -45,6 +46,7 @@ func Create(addr string, groupid string) (*Client, error) { subFun: make(map[string]func(v string)), timerFun: make(map[string]func()), + dalyFun: make(map[string]func()), chSend: make(chan []string, 100), chReceive: make(chan []string, 100), timerReceive: make(chan []string, 100), @@ -139,8 +141,9 @@ func (c *Client) Publish(topic string, message string) error { return nil } -func (c *Client) Daly(topic string, message string, daly int) error { - c.send("daly", topic, message, strconv.Itoa(daly)) +func (c *Client) Daly(topic string, daly int, fun func()) error { + c.send("daly", topic, strconv.Itoa(daly)) + c.dalyFun[topic] = fun return nil } @@ -167,6 +170,10 @@ func (c *Client) Cmd(cmd string) { c.send("cmd", cmd) } +func (c *Client) Close() { + c.conn.Close() +} + /* // subscribe topic --- @@ -264,6 +271,11 @@ func (c *Client) receive() { c.timerReceive <- vs continue } + if len(vs) == 2 && strings.EqualFold(vs[0], "daly") { + c.dalyFun[vs[1]]() + delete(c.dalyFun, vs[1]) + continue + } continue case "+": // +pong, +xxx diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 9864edf..5d3542d 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -84,14 +84,14 @@ func msgAccept(v Message) { } } -// daly topic valye 100 +// daly topic 100 func daly(rcmd []string, c *ZConn) { - if len(rcmd) != 4 { + if len(rcmd) != 3 { send(c.conn, "-Error: subscribe para number!") return } - t, err := strconv.ParseInt(rcmd[3], 10, 64) + t, err := strconv.ParseInt(rcmd[2], 10, 64) if err != nil { send(c.conn, "-Error: "+strings.Join(rcmd, " ")) return @@ -100,7 +100,8 @@ func daly(rcmd []string, c *ZConn) { timer := time.NewTimer(time.Duration(t) * time.Millisecond) select { case <-timer.C: - zsub.publish(rcmd[1], rcmd[2]) + send(c.conn, "daly", rcmd[1]) + // zsub.publish(rcmd[1], rcmd[2]) } } diff --git a/zsub/zsub.go b/zsub/zsub.go index 262de90..edf0c67 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -41,12 +41,14 @@ type ZConn struct { //ZConn 3、若有待消费消息启动消费 */ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} + s.Lock() + defer s.Unlock() ztopic := s.topics[topic] //ZTopic if ztopic == nil { ztopic = &ZTopic{ groups: map[string]*ZGroup{}, topic: topic, - chMsg: make(chan string, 100), + chMsg: make(chan string, 10000), } ztopic.init() s.topics[topic] = ztopic @@ -67,7 +69,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} for i, item := range c.topics { if strings.EqualFold(item, topic) { - c.topics = append(c.topics[:i], c.topics[:i+1]...) + c.topics = append(c.topics[:i], c.topics[i+1:]...) } } c.topics = append(c.topics, topic) @@ -91,7 +93,7 @@ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} for i, item := range zgroup.conns { if item == c { - zgroup.conns = append(zgroup.conns[:i], zgroup.conns[:i+1]...) + zgroup.conns = append(zgroup.conns[:i], zgroup.conns[i+1:]...) } } } @@ -121,10 +123,18 @@ func (s *ZSub) close(c *ZConn) { // daly // timer conn close + s.Lock() + defer s.Unlock() for _, topic := range c.timers { // fixme: 数据逻辑交叉循环 timer := s.timers[topic] - if timer != nil { - timer.close(c) + if timer == nil { + continue + } + + for i, item := range timer.conns { + if item == c { + timer.conns = append(timer.conns[:i], timer.conns[i+1:]...) + } } } (*c.conn).Close() @@ -133,7 +143,7 @@ func (s *ZSub) close(c *ZConn) { func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { for i, item := range arr { if item == c { - arr = append(arr[:i], arr[:i+1]...) + arr = append(arr[:i], arr[i+1:]...) } } return append(arr, c) diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 9f52b7e..a3559dd 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -97,17 +97,15 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) { timer.single = strings.EqualFold("a", rcmd[3]) //timer.configSave() } - - s.timers[rcmd[1]] = timer } -func (t *ZTimer) close(c *ZConn) { +/*func (t *ZTimer) close(c *ZConn) { for i, item := range t.conns { if item.conn == c.conn { t.conns = append(t.conns[:i], t.conns[i+1:]...) } } -} +}*/ func (t *ZTimer) configSave() { tpl, err := template.New("").Parse(`