From a2542696843934773700983d187601388775a175 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 18 Oct 2021 08:02:59 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Arpc=20=E6=97=A0?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@130 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- zsub/msg-consumer.go | 22 ++++++++++++++++++++++ zsub/zsub.go | 14 ++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 414cd1b..f955388 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -1,6 +1,7 @@ package zsub import ( + "encoding/json" "log" "strconv" "strings" @@ -61,6 +62,27 @@ func msgAccept(v Message) { case "groupid": c.groupid = rcmd[1] return + case "rpc": + // if rpc and no sub back error + if zsub.noSubscribe(rcmd[1]) { + rpcBody := make(map[string]string) + json.Unmarshal([]byte(rcmd[2]), &rpcBody) + log.Println("rpc no subscribe: ", rcmd[1]) + + ruk := rpcBody["ruk"] + zsub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}") + return + } + + if len(rcmd) != 3 { + c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") + } else { + if len(topicChan) < cap(topicChan) { + topicChan <- rcmd + } + zsub.Publish(rcmd[1], rcmd[2]) + } + return case "publish": if len(rcmd) != 3 { c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") diff --git a/zsub/zsub.go b/zsub/zsub.go index 5d83647..2b9f2dc 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -510,6 +510,20 @@ func (s *ZSub) Clearup() { } } +func (s *ZSub) noSubscribe(topic string) bool { + zTopic := s.topics[topic] + if zTopic == nil || len(zTopic.groups) == 0 { + return true + } + + for _, g := range zTopic.groups { + if len(g.conns) > 0 { + return false + } + } + return true +} + func ZSubx() *ZSub { return zsub }