diff --git a/cli/client.go b/cli/client.go index 91855b4..100bb0b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -4,6 +4,9 @@ import ( "bufio" "fmt" "github.com/go-basic/uuid" + "unicode/utf8" + + //"github.com/go-basic/uuid" "log" "net" "os" @@ -257,7 +260,7 @@ a: } else if len(vs) > 1 { data := "*" + strconv.Itoa(len(vs)) + "\r\n" for _, v := range vs { - data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += "$" + strconv.Itoa(utf8.RuneCountInString(v)) + "\r\n" data += v + "\r\n" } _, err = c.conn.Write([]byte(data)) @@ -341,6 +344,9 @@ func (c *Client) receive() { continue case "+": // +pong, +xxx + if strings.EqualFold("+ping", string(v)) { + // c.send("+pong") + } case "-": fmt.Println("error:", string(v)) case ":": diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 91ac3d7..6256ace 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -4,6 +4,7 @@ import ( "log" "strconv" "strings" + "time" "zhub/conf" ) @@ -22,6 +23,12 @@ func msgAccept(v Message) { return } + // ping reply + if strings.EqualFold("+pong", v.Rcmd[0]) { + v.Conn.pong = time.Now().Unix() + return + } + if conf.LogDebug { log.Println("rcmd: " + strings.Join(rcmd, " ")) } diff --git a/zsub/zsub.go b/zsub/zsub.go index 77d2620..372198f 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -19,15 +19,56 @@ var ( timers: make(map[string]*ZTimer), delays: make(map[string]*ZDelay), locks: make(map[string][]*Lock), + conns: make([]*ZConn, 0), } ) +func init() { + // conn health check: T=10s, close>29s + go func() { + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + + for range ticker.C { + funChan <- func() { + defer func() { + if r := recover(); r != nil { + log.Println("conn health check Recovered:", r) + } + }() + conns := make([]*ZConn, 0) // 需要关闭的连接 + for _, c := range zsub.conns { + if c.ping > 0 && c.ping-c.pong > 19 { + conns = c.appendTo(conns) + continue + } + + c.ping = time.Now().Unix() + if c.pong == 0 { + c.pong = c.ping + } + + c.send("+ping") + } + + // close + for _, c := range conns { + log.Println("========================================= conn ping close:", (*c.conn).RemoteAddr(), "[", c.groupid, "] =========================================") + c.close() + } + + } + } + }() +} + type ZSub struct { sync.RWMutex topics map[string]*ZTopic timers map[string]*ZTimer delays map[string]*ZDelay locks map[string][]*Lock + conns []*ZConn } type ZConn struct { //ZConn @@ -38,6 +79,8 @@ type ZConn struct { //ZConn timers []string // 订阅、定时调度分别创建各自连接 stoped chan int // 关闭信号量 substoped map[string]chan int // 关闭信号量 + ping int64 // 最后心跳时间 + pong int64 // 最后心跳回复时间 } type Lock struct { @@ -181,6 +224,18 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn { return append(arr, c) } +func (c *ZConn) removeTo(arr []*ZConn) []*ZConn { + if arr == nil { + arr = make([]*ZConn, 0) + } + for i, item := range arr { + if item == c { + arr = append(arr[:i], arr[i+1:]...) + } + } + return arr +} + // ServerStart ================== ZHub server ===================================== /* 1、初始化服务 @@ -232,9 +287,20 @@ func (s *ZSub) acceptHandler(c *ZConn) { } }() defer func() { - c.close() // close ZConn + // conn remove to conns + funChan <- func() { + zsub.conns = c.removeTo(zsub.conns) + } + + // close ZConn + c.close() }() + // conn add to conns + funChan <- func() { + zsub.conns = c.appendTo(zsub.conns) + } + reader := bufio.NewReader(*c.conn) for { rcmd := make([]string, 0)