新增:连接健康检查

git-svn-id: svn://47.119.165.148/zhub@123 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-08-05 08:07:06 +00:00
parent ddbab4b725
commit 05e5e41314
3 changed files with 81 additions and 2 deletions

View File

@@ -4,6 +4,9 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/go-basic/uuid" "github.com/go-basic/uuid"
"unicode/utf8"
//"github.com/go-basic/uuid"
"log" "log"
"net" "net"
"os" "os"
@@ -257,7 +260,7 @@ a:
} else if len(vs) > 1 { } else if len(vs) > 1 {
data := "*" + strconv.Itoa(len(vs)) + "\r\n" data := "*" + strconv.Itoa(len(vs)) + "\r\n"
for _, v := range vs { for _, v := range vs {
data += "$" + strconv.Itoa(len(v)) + "\r\n" data += "$" + strconv.Itoa(utf8.RuneCountInString(v)) + "\r\n"
data += v + "\r\n" data += v + "\r\n"
} }
_, err = c.conn.Write([]byte(data)) _, err = c.conn.Write([]byte(data))
@@ -341,6 +344,9 @@ func (c *Client) receive() {
continue continue
case "+": // +pong, +xxx case "+": // +pong, +xxx
if strings.EqualFold("+ping", string(v)) {
// c.send("+pong")
}
case "-": case "-":
fmt.Println("error:", string(v)) fmt.Println("error:", string(v))
case ":": case ":":

View File

@@ -4,6 +4,7 @@ import (
"log" "log"
"strconv" "strconv"
"strings" "strings"
"time"
"zhub/conf" "zhub/conf"
) )
@@ -22,6 +23,12 @@ func msgAccept(v Message) {
return return
} }
// ping reply
if strings.EqualFold("+pong", v.Rcmd[0]) {
v.Conn.pong = time.Now().Unix()
return
}
if conf.LogDebug { if conf.LogDebug {
log.Println("rcmd: " + strings.Join(rcmd, " ")) log.Println("rcmd: " + strings.Join(rcmd, " "))
} }

View File

@@ -19,15 +19,56 @@ var (
timers: make(map[string]*ZTimer), timers: make(map[string]*ZTimer),
delays: make(map[string]*ZDelay), delays: make(map[string]*ZDelay),
locks: make(map[string][]*Lock), locks: make(map[string][]*Lock),
conns: make([]*ZConn, 0),
} }
) )
func init() {
// conn health check: T=10s, close>29s
go func() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for range ticker.C {
funChan <- func() {
defer func() {
if r := recover(); r != nil {
log.Println("conn health check Recovered:", r)
}
}()
conns := make([]*ZConn, 0) // 需要关闭的连接
for _, c := range zsub.conns {
if c.ping > 0 && c.ping-c.pong > 19 {
conns = c.appendTo(conns)
continue
}
c.ping = time.Now().Unix()
if c.pong == 0 {
c.pong = c.ping
}
c.send("+ping")
}
// close
for _, c := range conns {
log.Println("========================================= conn ping close:", (*c.conn).RemoteAddr(), "[", c.groupid, "] =========================================")
c.close()
}
}
}
}()
}
type ZSub struct { type ZSub struct {
sync.RWMutex sync.RWMutex
topics map[string]*ZTopic topics map[string]*ZTopic
timers map[string]*ZTimer timers map[string]*ZTimer
delays map[string]*ZDelay delays map[string]*ZDelay
locks map[string][]*Lock locks map[string][]*Lock
conns []*ZConn
} }
type ZConn struct { //ZConn type ZConn struct { //ZConn
@@ -38,6 +79,8 @@ type ZConn struct { //ZConn
timers []string // 订阅、定时调度分别创建各自连接 timers []string // 订阅、定时调度分别创建各自连接
stoped chan int // 关闭信号量 stoped chan int // 关闭信号量
substoped map[string]chan int // 关闭信号量 substoped map[string]chan int // 关闭信号量
ping int64 // 最后心跳时间
pong int64 // 最后心跳回复时间
} }
type Lock struct { type Lock struct {
@@ -181,6 +224,18 @@ func (c *ZConn) appendTo(arr []*ZConn) []*ZConn {
return append(arr, c) return append(arr, c)
} }
func (c *ZConn) removeTo(arr []*ZConn) []*ZConn {
if arr == nil {
arr = make([]*ZConn, 0)
}
for i, item := range arr {
if item == c {
arr = append(arr[:i], arr[i+1:]...)
}
}
return arr
}
// ServerStart ================== ZHub server ===================================== // ServerStart ================== ZHub server =====================================
/* /*
1、初始化服务 1、初始化服务
@@ -232,9 +287,20 @@ func (s *ZSub) acceptHandler(c *ZConn) {
} }
}() }()
defer func() { defer func() {
c.close() // close ZConn // conn remove to conns
funChan <- func() {
zsub.conns = c.removeTo(zsub.conns)
}
// close ZConn
c.close()
}() }()
// conn add to conns
funChan <- func() {
zsub.conns = c.appendTo(zsub.conns)
}
reader := bufio.NewReader(*c.conn) reader := bufio.NewReader(*c.conn)
for { for {
rcmd := make([]string, 0) rcmd := make([]string, 0)