修改:主题取消订阅无效 bug

git-svn-id: svn://47.119.165.148/zhub@98 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-02-03 08:01:57 +00:00
parent e532252ab5
commit ad3f7686ae
5 changed files with 32 additions and 13 deletions

View File

@@ -119,6 +119,11 @@ func (c *Client) Subscribe(topic string, fun func(v string)) {
} }
} }
func (c *Client) Unsubscribe(topic string) {
c.send("unsubscribe " + topic)
delete(c.subFun, topic)
}
/* /*
--- ---
ping ping

View File

@@ -1,7 +1,10 @@
SET GOOS=linux SET GOOS=linux
SET GOARCH=amd64 SET GOARCH=amd64
go build -o zhub.sh -ldflags "-s -w" ./main.go go build -o zhub.sh -ldflags "-s -w" ./app.go
upx -9 zhub.sh upx -9 zhub.sh
scp zhub.sh zhost: scp zhub.sh xhost:/opt/zhub
scp zhub.sh zhost:/opt/zhub
scp zhub.sh qhost:/opt/zhub
scp zhub.sh nhost:/opt/zhub
del zhub.sh del zhub.sh

View File

@@ -106,7 +106,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
if timer != nil { if timer != nil {
if t == -1 { if t == -1 {
timer.Stop() timer.Stop()
s.delays[rcmd[1]+"-"+rcmd[2]] = nil delete(s.delays, rcmd[1]+"-"+rcmd[2])
return return
} }
timer.Reset(time.Duration(t) * time.Millisecond) timer.Reset(time.Duration(t) * time.Millisecond)
@@ -117,7 +117,7 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
select { select {
case <-timer.C: case <-timer.C:
zsub.publish(rcmd[1], rcmd[2]) zsub.publish(rcmd[1], rcmd[2])
s.delays[rcmd[1]+"-"+rcmd[2]] = nil delete(s.delays, rcmd[1]+"-"+rcmd[2])
} }
}() }()
} }

View File

@@ -31,6 +31,8 @@ func (g *ZGroup) appendTo(c *ZConn) {
atomic.AddInt32(&g.offset, 1) atomic.AddInt32(&g.offset, 1)
case <-c.stoped: case <-c.stoped:
return return
case <-c.substoped[g.ztopic.topic]:
return
} }
} }
}() }()

View File

@@ -27,19 +27,21 @@ type ZSub struct {
type ZConn struct { //ZConn type ZConn struct { //ZConn
sync.Mutex sync.Mutex
conn *net.Conn conn *net.Conn
groupid string groupid string
topics []string topics []string
timers []string // 订阅、定时调度分别创建各自连接 timers []string // 订阅、定时调度分别创建各自连接
stoped chan int // 关闭信号量 stoped chan int // 关闭信号量
substoped map[string]chan int // 关闭信号量
} }
func NewZConn(conn *net.Conn) *ZConn { func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{ return &ZConn{
conn: conn, conn: conn,
topics: []string{}, topics: []string{},
timers: []string{}, timers: []string{},
stoped: make(chan int, 0), stoped: make(chan int, 0),
substoped: make(map[string]chan int),
} }
} }
@@ -74,6 +76,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{}
} }
//zgroup.conns = c.appendTo(zgroup.conns) //zgroup.conns = c.appendTo(zgroup.conns)
c.substoped[topic] = make(chan int, 0)
zgroup.appendTo(c) zgroup.appendTo(c)
for i, item := range c.topics { for i, item := range c.topics {
@@ -90,6 +93,7 @@ func (s *ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{}
func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} func (s *ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{}
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
close(c.substoped[topic])
ztopic := s.topics[topic] //ZTopic ztopic := s.topics[topic] //ZTopic
if ztopic == nil { if ztopic == nil {
return return
@@ -226,6 +230,11 @@ func ServerStart(addr string) {
// 连接处理 // 连接处理
func (s *ZSub) acceptHandler(c *ZConn) { func (s *ZSub) acceptHandler(c *ZConn) {
defer func() {
if r := recover(); r != nil {
log.Println("acceptHandler Recovered:", r)
}
}()
defer func() { defer func() {
s.close(c) // close ZConn s.close(c) // close ZConn
}() }()