新增:zhub-cli.rpc 实现

git-svn-id: svn://47.119.165.148/zhub@141 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-11-17 06:11:58 +00:00
parent 7ca71eb838
commit 0ec4363aa8
2 changed files with 117 additions and 2 deletions

View File

@@ -2,6 +2,7 @@ package cmd
import ( import (
"bufio" "bufio"
"encoding/json"
"fmt" "fmt"
"github.com/go-basic/uuid" "github.com/go-basic/uuid"
"unicode/utf8" "unicode/utf8"
@@ -20,6 +21,7 @@ type Client struct {
wlock sync.Mutex // write lock wlock sync.Mutex // write lock
rlock sync.Mutex // read lock rlock sync.Mutex // read lock
appname string // local appname
addr string // host:port addr string // host:port
conn net.Conn // socket conn conn net.Conn // socket conn
createTime time.Time // client create time createTime time.Time // client create time
@@ -42,7 +44,7 @@ type Lock struct {
// duration int // lock duration // duration int // lock duration
} }
func Create(addr string, groupid string) (*Client, error) { func Create(appname string, addr string, groupid string) (*Client, error) {
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
if err != nil { if err != nil {
return &Client{}, err return &Client{}, err
@@ -51,6 +53,7 @@ func Create(addr string, groupid string) (*Client, error) {
client := Client{ client := Client{
wlock: sync.Mutex{}, wlock: sync.Mutex{},
rlock: sync.Mutex{}, rlock: sync.Mutex{},
appname: appname,
addr: addr, addr: addr,
conn: conn, conn: conn,
groupid: groupid, groupid: groupid,
@@ -116,6 +119,7 @@ func (c *Client) init() {
} }
} }
}() }()
c.rpcInit()
go c.receive() go c.receive()
} }
@@ -232,6 +236,117 @@ func (c *Client) Unlock(l Lock) {
delete(c.lockFlag, l.Uuid) delete(c.lockFlag, l.Uuid)
} }
// -------------------------------------- rpc --------------------------------------
var rpcMap = make(map[string]*Rpc)
var rpcLock = sync.RWMutex{}
func (c *Client) rpcInit() {
// 添加 appname 主题订阅处理
c.Subscribe(c.appname, func(v string) {
log.Println("rpc back:", v)
rpcLock.Lock()
defer rpcLock.Unlock()
result := RpcResult{}
err := json.Unmarshal([]byte(v), &result)
if err != nil {
// 返回失败处理
log.Println("rpc result parse error:", err)
return
}
rpc := rpcMap[result.Ruk]
if rpc == nil {
return // 本地已无 rpc 请求等待,如:超时结束
}
rpc.RpcResult = result
close(rpc.Ch) // 发送事件
delete(rpcMap, result.Ruk)
})
}
type Rpc struct {
Ruk string `json:"ruk"`
Topic string `json:"topic"`
Value string `json:"value"`
Ch chan int `json:"-"` //请求返回标记
RpcResult RpcResult `json:"-"`
}
type RpcResult struct {
Ruk string `json:"ruk"`
Retcode int `json:"retcode"`
Retinfo string `json:"retinfo"`
Result string `json:"result"`
}
func (r Rpc) backTopic() string {
return strings.Split(r.Ruk, "::")[0]
}
func (r Rpc) Retcode() int {
return r.RpcResult.Retcode
}
func (r Rpc) Retinfo() string {
return r.RpcResult.Retinfo
}
func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
rpc := Rpc{
Ruk: c.appname + "::" + uuid.New(),
Topic: topic,
Value: message,
Ch: make(chan int, 0),
}
bytes, err := json.Marshal(&rpc)
if err != nil {
log.Println("rpc marshal error:", err)
}
log.Println("rpc call:", string(bytes[:]))
c.Publish(topic, string(bytes[:]))
rpcLock.Lock()
rpcMap[rpc.Ruk] = &rpc
rpcLock.Unlock()
select {
case <-rpc.Ch:
// ch 事件rpc 返回)
case <-time.After(time.Second * 15):
// rpc 超时
x, _ := json.Marshal(rpc)
log.Println("rpc timeout:", x)
rpc.RpcResult = RpcResult{
Retcode: 505,
Retinfo: "请求超时",
}
}
back(rpc.RpcResult)
}
// rpc subscribe
func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
c.Subscribe(topic, func(v string) {
rpc := Rpc{}
err := json.Unmarshal([]byte(v), &rpc)
if err != nil {
return
}
result := fun(rpc)
result.Ruk = rpc.Ruk
res, _ := json.Marshal(result)
c.Publish(rpc.backTopic(), string(res[:]))
})
}
// --------------------------------------------------------------------------------
/*func (c *Client) subscribes(topics ...string) error { /*func (c *Client) subscribes(topics ...string) error {
if len(topics) == 0 { if len(topics) == 0 {
return nil return nil

View File

@@ -29,7 +29,7 @@ func main() {
} }
if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") { if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") {
if cli, err := cmd.Create(addr, "group-admin"); err != nil { if cli, err := cmd.Create("zhub-local", addr, "group-admin"); err != nil {
log.Println(err) log.Println(err)
} else { } else {
switch os.Args[2] { switch os.Args[2] {