diff --git a/cli/client.go b/cli/client.go index e245e69..38f0302 100644 --- a/cli/client.go +++ b/cli/client.go @@ -119,6 +119,11 @@ func (c *Client) Subscribe(topic string, fun func(v string)) { } } +func (c *Client) Unsubscribe(topic string) { + c.send("unsubscribe " + topic) + delete(c.subFun, topic) +} + /* --- ping diff --git a/pkg.bat b/pkg.bat index f3d8cc3..6d4882c 100644 --- a/pkg.bat +++ b/pkg.bat @@ -1,7 +1,10 @@ SET GOOS=linux SET GOARCH=amd64 -go build -o zhub.sh -ldflags "-s -w" ./main.go +go build -o zhub.sh -ldflags "-s -w" ./app.go upx -9 zhub.sh -scp zhub.sh zhost: +scp zhub.sh xhost:/opt/zhub +scp zhub.sh zhost:/opt/zhub +scp zhub.sh qhost:/opt/zhub +scp zhub.sh nhost:/opt/zhub del zhub.sh diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index e82384e..89d5331 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -106,7 +106,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { if timer != nil { if t == -1 { timer.Stop() - s.delays[rcmd[1]+"-"+rcmd[2]] = nil + delete(s.delays, rcmd[1]+"-"+rcmd[2]) return } timer.Reset(time.Duration(t) * time.Millisecond) @@ -117,7 +117,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { select { case <-timer.C: zsub.publish(rcmd[1], rcmd[2]) - s.delays[rcmd[1]+"-"+rcmd[2]] = nil + delete(s.delays, rcmd[1]+"-"+rcmd[2]) } }() } diff --git a/zsub/zgroup.go b/zsub/zgroup.go index 1a348f9..7e6e701 100644 --- a/zsub/zgroup.go +++ b/zsub/zgroup.go @@ -31,6 +31,8 @@ func (g *ZGroup) appendTo(c *ZConn) { atomic.AddInt32(&g.offset, 1) case <-c.stoped: return + case <-c.substoped[g.ztopic.topic]: + return } } }() diff --git a/zsub/zsub.go b/zsub/zsub.go index c34833c..eb2e362 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -27,19 +27,21 @@ type ZSub struct { type ZConn struct { //ZConn sync.Mutex - conn *net.Conn - groupid string - topics []string - timers []string // 订阅、定时调度分别创建各自连接 - stoped chan int // 关闭信号量 + conn *net.Conn + groupid string + topics []string + timers []string // 订阅、定时调度分别创建各自连接 + stoped chan int // 关闭信号量 + substoped map[string]chan int // 关闭信号量 } func NewZConn(conn *net.Conn) *ZConn { return &ZConn{ - conn: conn, - topics: []string{}, - timers: []string{}, - stoped: make(chan int, 0), + conn: conn, + topics: []string{}, + timers: []string{}, + stoped: make(chan int, 0), + substoped: make(map[string]chan int), } } @@ -74,6 +76,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} } //zgroup.conns = c.appendTo(zgroup.conns) + c.substoped[topic] = make(chan int, 0) zgroup.appendTo(c) for i, item := range c.topics { @@ -90,6 +93,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} s.Lock() defer s.Unlock() + close(c.substoped[topic]) ztopic := s.topics[topic] //ZTopic if ztopic == nil { return @@ -226,6 +230,11 @@ func ServerStart(addr string) { // 连接处理 func (s *ZSub) acceptHandler(c *ZConn) { + defer func() { + if r := recover(); r != nil { + log.Println("acceptHandler Recovered:", r) + } + }() defer func() { s.close(c) // close ZConn }()