From 15a73136cc3b607a07292a2b68d7cb1eec2cefc7 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 12 Aug 2021 03:31:03 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A1.=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E8=BF=9E=E6=8E=A5=E7=BC=96=E5=8F=B7=202.?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E6=9C=8D=E5=8A=A1=E4=BF=A1=E6=81=AF=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@124 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli/client.go | 2 +- monitor/monitor.go | 4 ++-- zsub/msg-consumer.go | 2 +- zsub/zsub.go | 35 +++++++++++++++++++++++++++++------ 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/cli/client.go b/cli/client.go index 100bb0b..6397fa3 100644 --- a/cli/client.go +++ b/cli/client.go @@ -345,7 +345,7 @@ func (c *Client) receive() { continue case "+": // +pong, +xxx if strings.EqualFold("+ping", string(v)) { - // c.send("+pong") + c.send("+pong") } case "-": fmt.Println("error:", string(v)) diff --git a/monitor/monitor.go b/monitor/monitor.go index ecb5d4d..5d3fbdc 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -31,8 +31,8 @@ func cleanup(w http.ResponseWriter, r *http.Request) { } func info(w http.ResponseWriter, r *http.Request) { - topics := zsub.Info() - renderJson(w, topics) + info := zsub.Info() + renderJson(w, info) } func renderJson(w http.ResponseWriter, d interface{}) { diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 6256ace..c0d53f7 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -30,7 +30,7 @@ func msgAccept(v Message) { } if conf.LogDebug { - log.Println("rcmd: " + strings.Join(rcmd, " ")) + log.Println("[", v.Conn.sn, "] rcmd: "+strings.Join(rcmd, " ")) } if len(rcmd) == 1 { diff --git a/zsub/zsub.go b/zsub/zsub.go index 372198f..7e841a4 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unicode/utf8" "zhub/conf" @@ -21,6 +22,7 @@ var ( locks: make(map[string][]*Lock), conns: make([]*ZConn, 0), } + SN int32 = 1000 ) func init() { @@ -73,6 +75,7 @@ type ZSub struct { type ZConn struct { //ZConn sync.Mutex + sn int32 // 连接编号 conn *net.Conn groupid string topics []string @@ -94,6 +97,7 @@ type Lock struct { func NewZConn(conn *net.Conn) *ZConn { return &ZConn{ + sn: atomic.AddInt32(&SN, 1), // 连接编号 conn: conn, topics: []string{}, timers: []string{}, @@ -272,9 +276,9 @@ func ServerStart(addr string) { log.Println(err) continue } - log.Println("conn start: ", conn.RemoteAddr()) - zConn := NewZConn(&conn) + + log.Println("conn start:", conn.RemoteAddr(), "[", zConn.sn, "]") go zsub.acceptHandler(zConn) } } @@ -285,6 +289,7 @@ func (s *ZSub) acceptHandler(c *ZConn) { if r := recover(); r != nil { log.Println("acceptHandler Recovered:", r) } + log.Println("conn closed:", (*c.conn).RemoteAddr(), "[", c.sn, "]") }() defer func() { // conn remove to conns @@ -442,8 +447,8 @@ func (s *ZSub) shutdown() { } func Info() map[string]interface{} { - m := map[string]interface{}{} - + // topics + topics := map[string]interface{}{} for s, topic := range zsub.topics { // {groups:[{name:xxx,size:xx}]} arr := make([]map[string]interface{}, 0) @@ -456,10 +461,28 @@ func Info() map[string]interface{} { "mcount": topic.mcount, }) } - m[s] = arr + topics[s] = arr } - return m + // conns + conns := make([]interface{}, 0) + for _, c := range zsub.conns { + m := make(map[string]interface{}, 0) + m["remoteaddr"] = (*c.conn).RemoteAddr() + m["groupid"] = c.groupid + m["topics"] = c.topics + m["timers"] = c.timers + conns = append(conns, m) + } + + info := map[string]interface{}{ + "topics": topics, + "topicsize": len(topics), + "timersize": len(zsub.timers), + "conns": conns, + "connsize": len(zsub.conns), + } + return info } func (s *ZSub) Clearup() {