From 0ec4363aa8f0af11cbc3d0dfceccb2d54dc28b91 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Wed, 17 Nov 2021 06:11:58 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Azhub-cli.rpc=20?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@141 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cmd/client.go | 117 +++++++++++++++++++++++++++++++++++++++++++++++++- main.go | 2 +- 2 files changed, 117 insertions(+), 2 deletions(-) diff --git a/cmd/client.go b/cmd/client.go index 5923c99..0c692b8 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -2,6 +2,7 @@ package cmd import ( "bufio" + "encoding/json" "fmt" "github.com/go-basic/uuid" "unicode/utf8" @@ -20,6 +21,7 @@ type Client struct { wlock sync.Mutex // write lock rlock sync.Mutex // read lock + appname string // local appname addr string // host:port conn net.Conn // socket conn createTime time.Time // client create time @@ -42,7 +44,7 @@ type Lock struct { // duration int // lock duration } -func Create(addr string, groupid string) (*Client, error) { +func Create(appname string, addr string, groupid string) (*Client, error) { conn, err := net.Dial("tcp", addr) if err != nil { return &Client{}, err @@ -51,6 +53,7 @@ func Create(addr string, groupid string) (*Client, error) { client := Client{ wlock: sync.Mutex{}, rlock: sync.Mutex{}, + appname: appname, addr: addr, conn: conn, groupid: groupid, @@ -116,6 +119,7 @@ func (c *Client) init() { } } }() + c.rpcInit() go c.receive() } @@ -232,6 +236,117 @@ func (c *Client) Unlock(l Lock) { delete(c.lockFlag, l.Uuid) } +// -------------------------------------- rpc -------------------------------------- +var rpcMap = make(map[string]*Rpc) +var rpcLock = sync.RWMutex{} + +func (c *Client) rpcInit() { + + // 添加 appname 主题订阅处理 + c.Subscribe(c.appname, func(v string) { + log.Println("rpc back:", v) + rpcLock.Lock() + defer rpcLock.Unlock() + + result := RpcResult{} + err := json.Unmarshal([]byte(v), &result) + if err != nil { + // 返回失败处理 + log.Println("rpc result parse error:", err) + return + } + + rpc := rpcMap[result.Ruk] + if rpc == nil { + return // 本地已无 rpc 请求等待,如:超时结束 + } + + rpc.RpcResult = result + close(rpc.Ch) // 发送事件 + delete(rpcMap, result.Ruk) + }) +} + +type Rpc struct { + Ruk string `json:"ruk"` + Topic string `json:"topic"` + Value string `json:"value"` + + Ch chan int `json:"-"` //请求返回标记 + RpcResult RpcResult `json:"-"` +} + +type RpcResult struct { + Ruk string `json:"ruk"` + Retcode int `json:"retcode"` + Retinfo string `json:"retinfo"` + Result string `json:"result"` +} + +func (r Rpc) backTopic() string { + return strings.Split(r.Ruk, "::")[0] +} + +func (r Rpc) Retcode() int { + return r.RpcResult.Retcode +} +func (r Rpc) Retinfo() string { + return r.RpcResult.Retinfo +} + +func (c Client) Rpc(topic string, message string, back func(res RpcResult)) { + rpc := Rpc{ + Ruk: c.appname + "::" + uuid.New(), + Topic: topic, + Value: message, + Ch: make(chan int, 0), + } + bytes, err := json.Marshal(&rpc) + if err != nil { + log.Println("rpc marshal error:", err) + } + log.Println("rpc call:", string(bytes[:])) + c.Publish(topic, string(bytes[:])) + + rpcLock.Lock() + rpcMap[rpc.Ruk] = &rpc + rpcLock.Unlock() + + select { + case <-rpc.Ch: + // ch 事件(rpc 返回) + case <-time.After(time.Second * 15): + // rpc 超时 + x, _ := json.Marshal(rpc) + log.Println("rpc timeout:", x) + rpc.RpcResult = RpcResult{ + Retcode: 505, + Retinfo: "请求超时", + } + } + back(rpc.RpcResult) +} + +// rpc subscribe +func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) { + c.Subscribe(topic, func(v string) { + rpc := Rpc{} + err := json.Unmarshal([]byte(v), &rpc) + if err != nil { + + return + } + + result := fun(rpc) + result.Ruk = rpc.Ruk + + res, _ := json.Marshal(result) + c.Publish(rpc.backTopic(), string(res[:])) + }) +} + +// -------------------------------------------------------------------------------- + /*func (c *Client) subscribes(topics ...string) error { if len(topics) == 0 { return nil diff --git a/main.go b/main.go index b8893b0..c55ff5c 100644 --- a/main.go +++ b/main.go @@ -29,7 +29,7 @@ func main() { } if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") { - if cli, err := cmd.Create(addr, "group-admin"); err != nil { + if cli, err := cmd.Create("zhub-local", addr, "group-admin"); err != nil { log.Println(err) } else { switch os.Args[2] {