diff --git a/zsub/zsub.go b/zsub/zsub.go index e2e49a4..5d83647 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -2,6 +2,7 @@ package zsub import ( "bufio" + "fmt" "log" "net" "os" @@ -369,8 +370,15 @@ func (s *ZSub) Publish(topic, msg string) { if ztopic == nil { return } - ztopic.chMsg <- msg ztopic.mcount++ + + // topic chan overload check + if len(ztopic.chMsg) == cap(ztopic.chMsg) { + log.Println(fmt.Sprintf("ztopic no cap: [%s %s]", topic, msg)) + return + } + + ztopic.chMsg <- msg } /* diff --git a/zsub/ztopic.go b/zsub/ztopic.go index c53c748..9b41eb4 100644 --- a/zsub/ztopic.go +++ b/zsub/ztopic.go @@ -1,6 +1,10 @@ package zsub -import "sync" +import ( + "fmt" + "log" + "sync" +) type ZTopic struct { //ZTopic sync.Mutex @@ -19,7 +23,12 @@ func (t *ZTopic) init() { break } - for _, group := range t.groups { + for name, group := range t.groups { + // zgroup chan overload check + if len(group.chMsg) == cap(group.chMsg) { + log.Println(fmt.Sprintf("zgroup no cap: [%s.%s %s]", name, t.topic, msg)) + continue + } group.chMsg <- msg } }