修复:多次重连后消息订阅失败 bug

git-svn-id: svn://47.119.165.148/zhub@72 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-01-16 18:03:19 +00:00
parent a99f2398af
commit 9b3c155f85
4 changed files with 37 additions and 16 deletions

View File

@@ -23,6 +23,7 @@ type Client struct {
subFun map[string]func(v string) // subscribe topic and callback function
timerFun map[string]func() // subscribe timer amd callback function
dalyFun map[string]func()
chSend chan []string // chan of send message
chReceive chan []string // chan of receive message
@@ -45,6 +46,7 @@ func Create(addr string, groupid string) (*Client, error) {
subFun: make(map[string]func(v string)),
timerFun: make(map[string]func()),
dalyFun: make(map[string]func()),
chSend: make(chan []string, 100),
chReceive: make(chan []string, 100),
timerReceive: make(chan []string, 100),
@@ -139,8 +141,9 @@ func (c *Client) Publish(topic string, message string) error {
return nil
}
func (c *Client) Daly(topic string, message string, daly int) error {
c.send("daly", topic, message, strconv.Itoa(daly))
func (c *Client) Daly(topic string, daly int, fun func()) error {
c.send("daly", topic, strconv.Itoa(daly))
c.dalyFun[topic] = fun
return nil
}
@@ -167,6 +170,10 @@ func (c *Client) Cmd(cmd string) {
c.send("cmd", cmd)
}
func (c *Client) Close() {
c.conn.Close()
}
/*
// subscribe topic
---
@@ -264,6 +271,11 @@ func (c *Client) receive() {
c.timerReceive <- vs
continue
}
if len(vs) == 2 && strings.EqualFold(vs[0], "daly") {
c.dalyFun[vs[1]]()
delete(c.dalyFun, vs[1])
continue
}
continue
case "+": // +pong, +xxx

View File

@@ -84,14 +84,14 @@ func msgAccept(v Message) {
}
}
// daly topic valye 100
// daly topic 100
func daly(rcmd []string, c *ZConn) {
if len(rcmd) != 4 {
if len(rcmd) != 3 {
send(c.conn, "-Error: subscribe para number!")
return
}
t, err := strconv.ParseInt(rcmd[3], 10, 64)
t, err := strconv.ParseInt(rcmd[2], 10, 64)
if err != nil {
send(c.conn, "-Error: "+strings.Join(rcmd, " "))
return
@@ -100,7 +100,8 @@ func daly(rcmd []string, c *ZConn) {
timer := time.NewTimer(time.Duration(t) * time.Millisecond)
select {
case <-timer.C:
zsub.publish(rcmd[1], rcmd[2])
send(c.conn, "daly", rcmd[1])
// zsub.publish(rcmd[1], rcmd[2])
}
}

View File

@@ -41,12 +41,14 @@ type ZConn struct { //ZConn
3、若有待消费消息启动消费
*/
func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{}
s.Lock()
defer s.Unlock()
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
ztopic = &ZTopic{
groups: map[string]*ZGroup{},
topic: topic,
chMsg: make(chan string, 100),
chMsg: make(chan string, 10000),
}
ztopic.init()
s.topics[topic] = ztopic
@@ -67,7 +69,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{}
for i, item := range c.topics {
if strings.EqualFold(item, topic) {
c.topics = append(c.topics[:i], c.topics[:i+1]...)
c.topics = append(c.topics[:i], c.topics[i+1:]...)
}
}
c.topics = append(c.topics, topic)
@@ -91,7 +93,7 @@ func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{}
for i, item := range zgroup.conns {
if item == c {
zgroup.conns = append(zgroup.conns[:i], zgroup.conns[:i+1]...)
zgroup.conns = append(zgroup.conns[:i], zgroup.conns[i+1:]...)
}
}
}
@@ -121,10 +123,18 @@ func (s *ZSub) close(c *ZConn) {
// daly
// timer conn close
s.Lock()
defer s.Unlock()
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
timer := s.timers[topic]
if timer != nil {
timer.close(c)
if timer == nil {
continue
}
for i, item := range timer.conns {
if item == c {
timer.conns = append(timer.conns[:i], timer.conns[i+1:]...)
}
}
}
(*c.conn).Close()
@@ -133,7 +143,7 @@ func (s *ZSub) close(c *ZConn) {
func (c *ZConn) appendTo(arr []*ZConn) []*ZConn {
for i, item := range arr {
if item == c {
arr = append(arr[:i], arr[:i+1]...)
arr = append(arr[:i], arr[i+1:]...)
}
}
return append(arr, c)

View File

@@ -97,17 +97,15 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
timer.single = strings.EqualFold("a", rcmd[3])
//timer.configSave()
}
s.timers[rcmd[1]] = timer
}
func (t *ZTimer) close(c *ZConn) {
/*func (t *ZTimer) close(c *ZConn) {
for i, item := range t.conns {
if item.conn == c.conn {
t.conns = append(t.conns[:i], t.conns[i+1:]...)
}
}
}
}*/
func (t *ZTimer) configSave() {
tpl, err := template.New("").Parse(`