diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 414cd1b..f955388 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -1,6 +1,7 @@ package zsub import ( + "encoding/json" "log" "strconv" "strings" @@ -61,6 +62,27 @@ func msgAccept(v Message) { case "groupid": c.groupid = rcmd[1] return + case "rpc": + // if rpc and no sub back error + if zsub.noSubscribe(rcmd[1]) { + rpcBody := make(map[string]string) + json.Unmarshal([]byte(rcmd[2]), &rpcBody) + log.Println("rpc no subscribe: ", rcmd[1]) + + ruk := rpcBody["ruk"] + zsub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") + return + } + + if len(rcmd) != 3 { + c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") + } else { + if len(topicChan) < cap(topicChan) { + topicChan <- rcmd + } + zsub.Publish(rcmd[1], rcmd[2]) + } + return case "publish": if len(rcmd) != 3 { c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") diff --git a/zsub/zsub.go b/zsub/zsub.go index 5d83647..2b9f2dc 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -510,6 +510,20 @@ func (s *ZSub) Clearup() { } } +func (s *ZSub) noSubscribe(topic string) bool { + zTopic := s.topics[topic] + if zTopic == nil || len(zTopic.groups) == 0 { + return true + } + + for _, g := range zTopic.groups { + if len(g.conns) > 0 { + return false + } + } + return true +} + func ZSubx() *ZSub { return zsub }