修改:zhub_client
This commit is contained in:
135
cmd/client.go
135
cmd/client.go
@@ -18,7 +18,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
type ZHubClient struct {
|
||||
wlock sync.Mutex // write lock
|
||||
rlock sync.Mutex // read lock
|
||||
|
||||
@@ -46,37 +46,40 @@ type Lock struct {
|
||||
// duration int // lock duration
|
||||
}
|
||||
|
||||
func Create(appname, addr, groupid, auth string) (*Client, error) {
|
||||
conn, err := net.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return &Client{}, err
|
||||
}
|
||||
|
||||
client := Client{
|
||||
wlock: sync.Mutex{},
|
||||
rlock: sync.Mutex{},
|
||||
appname: appname,
|
||||
addr: addr,
|
||||
conn: conn,
|
||||
groupid: groupid,
|
||||
createTime: time.Now(),
|
||||
|
||||
subFun: make(map[string]func(v string)),
|
||||
timerFun: make(map[string]func()),
|
||||
chSend: make(chan []string, 100),
|
||||
chReceive: make(chan []string, 100),
|
||||
timerReceive: make(chan []string, 100),
|
||||
lockFlag: make(map[string]*Lock),
|
||||
auth: auth,
|
||||
}
|
||||
|
||||
client.send("auth", auth)
|
||||
client.send("groupid " + groupid)
|
||||
client.init()
|
||||
return &client, err
|
||||
func (c *ZHubClient) Initx(appname, addr, groupid, auth string) error {
|
||||
c.appname = appname
|
||||
c.addr = addr
|
||||
c.groupid = groupid
|
||||
c.auth = auth
|
||||
return c.Init()
|
||||
}
|
||||
|
||||
func (c *Client) reconn() (err error) {
|
||||
// Init 创建一个客户端
|
||||
func (c *ZHubClient) Init( /*appname, addr, groupid, auth string*/ ) error {
|
||||
conn, err := net.Dial("tcp", c.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.conn = conn
|
||||
c.wlock = sync.Mutex{}
|
||||
c.rlock = sync.Mutex{}
|
||||
c.createTime = time.Now()
|
||||
|
||||
c.subFun = make(map[string]func(v string))
|
||||
c.timerFun = make(map[string]func())
|
||||
c.chSend = make(chan []string, 100)
|
||||
c.chReceive = make(chan []string, 100)
|
||||
c.timerReceive = make(chan []string, 100)
|
||||
c.lockFlag = make(map[string]*Lock)
|
||||
|
||||
c.send("auth", c.auth)
|
||||
c.send("groupid " + c.groupid)
|
||||
c.init()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ZHubClient) reconn() (err error) {
|
||||
for n := 1; n < 10; n++ {
|
||||
conn, err := net.Dial("tcp", c.addr)
|
||||
if err != nil {
|
||||
@@ -102,7 +105,7 @@ func (c *Client) reconn() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Client) init() {
|
||||
func (c *ZHubClient) init() {
|
||||
// 消费 topic 消息
|
||||
go func() {
|
||||
for {
|
||||
@@ -130,7 +133,7 @@ func (c *Client) init() {
|
||||
}
|
||||
|
||||
// Subscribe subscribe topic
|
||||
func (c *Client) Subscribe(topic string, fun func(v string)) {
|
||||
func (c *ZHubClient) Subscribe(topic string, fun func(v string)) {
|
||||
c.send("subscribe " + topic)
|
||||
if fun != nil {
|
||||
c.wlock.Lock()
|
||||
@@ -139,7 +142,7 @@ func (c *Client) Subscribe(topic string, fun func(v string)) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Unsubscribe(topic string) {
|
||||
func (c *ZHubClient) Unsubscribe(topic string) {
|
||||
c.send("unsubscribe " + topic)
|
||||
delete(c.subFun, topic)
|
||||
}
|
||||
@@ -149,32 +152,32 @@ func (c *Client) Unsubscribe(topic string) {
|
||||
ping
|
||||
---
|
||||
*/
|
||||
func (c *Client) ping() {
|
||||
func (c *ZHubClient) ping() {
|
||||
c.send("ping")
|
||||
}
|
||||
|
||||
// Publish -------------------------------------- pub-sub --------------------------------------
|
||||
func (c *Client) Publish(topic string, message string) error {
|
||||
func (c *ZHubClient) Publish(topic string, message string) error {
|
||||
return c.send("publish", topic, message)
|
||||
}
|
||||
|
||||
func (c *Client) Broadcast(topic string, message string) error {
|
||||
func (c *ZHubClient) Broadcast(topic string, message string) error {
|
||||
return c.send("broadcast", topic, message)
|
||||
}
|
||||
|
||||
func (c *Client) Delay(topic string, message string, delay int) error {
|
||||
func (c *ZHubClient) Delay(topic string, message string, delay int) error {
|
||||
return c.send("delay", topic, message, strconv.Itoa(delay))
|
||||
}
|
||||
|
||||
/*
|
||||
Timer
|
||||
|
||||
func (c *Client) Timer(topic string, expr string, fun func()) {
|
||||
func (c *ZHubClient) Timer(topic string, expr string, fun func()) {
|
||||
c.timerFun[topic] = fun
|
||||
c.send("timer", topic, expr, "x")
|
||||
}
|
||||
*/
|
||||
func (c *Client) Timer(topic string, fun func()) {
|
||||
func (c *ZHubClient) Timer(topic string, fun func()) {
|
||||
if fun != nil {
|
||||
c.timerFun[topic] = fun
|
||||
}
|
||||
@@ -182,7 +185,7 @@ func (c *Client) Timer(topic string, fun func()) {
|
||||
}
|
||||
|
||||
// Cmd send cmd
|
||||
func (c *Client) Cmd(cmd ...string) {
|
||||
func (c *ZHubClient) Cmd(cmd ...string) {
|
||||
if len(cmd) == 1 {
|
||||
c.send("cmd", cmd[0])
|
||||
} else if len(cmd) > 1 {
|
||||
@@ -193,7 +196,7 @@ func (c *Client) Cmd(cmd ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
func (c *ZHubClient) Close() {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
@@ -207,7 +210,7 @@ func TryLock(key string, duration int) {
|
||||
}
|
||||
|
||||
// Lock Key
|
||||
func (c *Client) Lock(key string, duration int) Lock {
|
||||
func (c *ZHubClient) Lock(key string, duration int) Lock {
|
||||
uuid := uuid.New()
|
||||
c.send("uuid", key, uuid, strconv.Itoa(duration))
|
||||
|
||||
@@ -230,7 +233,7 @@ func (c *Client) Lock(key string, duration int) Lock {
|
||||
return Lock{Key: key, Value: uuid}
|
||||
}
|
||||
|
||||
func (c *Client) Unlock(l Lock) {
|
||||
func (c *ZHubClient) Unlock(l Lock) {
|
||||
c.send("unlock", l.Key, l.Value)
|
||||
delete(c.lockFlag, l.Value)
|
||||
}
|
||||
@@ -239,7 +242,7 @@ func (c *Client) Unlock(l Lock) {
|
||||
var rpcMap = make(map[string]*Rpc)
|
||||
var rpcLock = sync.RWMutex{}
|
||||
|
||||
func (c *Client) rpcInit() {
|
||||
func (c *ZHubClient) rpcInit() {
|
||||
|
||||
// 添加 appname 主题订阅处理
|
||||
c.Subscribe(c.appname, func(v string) {
|
||||
@@ -286,7 +289,7 @@ func (r Rpc) backTopic() string {
|
||||
return strings.Split(r.Ruk, "::")[0]
|
||||
}
|
||||
|
||||
func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
|
||||
func (c ZHubClient) Rpc(topic string, message string, back func(res RpcResult)) {
|
||||
rpc := Rpc{
|
||||
Ruk: c.appname + "::" + uuid.New(),
|
||||
Topic: topic,
|
||||
@@ -320,7 +323,7 @@ func (c Client) Rpc(topic string, message string, back func(res RpcResult)) {
|
||||
}
|
||||
|
||||
// RpcSubscribe rpc subscribe
|
||||
func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
||||
func (c ZHubClient) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
||||
c.Subscribe(topic, func(v string) {
|
||||
rpc := Rpc{}
|
||||
err := json.Unmarshal([]byte(v), &rpc)
|
||||
@@ -333,7 +336,7 @@ func (c Client) RpcSubscribe(topic string, fun func(Rpc Rpc) RpcResult) {
|
||||
result.Ruk = rpc.Ruk
|
||||
|
||||
res, _ := json.Marshal(result)
|
||||
c.Publish(rpc.backTopic(), string(res[:]))
|
||||
c.Publish(rpc.backTopic(), string(res))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -344,7 +347,7 @@ send socket message :
|
||||
if len(vs) equal 1 will send message `vs[0] + "\r\n"`
|
||||
else if len(vs) gt 1 will send message `* + len(vs)+ "\r\n" +"$"+ len(vs[n])+ "\r\n" + vs[n] + "\r\n" ...`
|
||||
*/
|
||||
func (c *Client) send(vs ...string) (err error) {
|
||||
func (c *ZHubClient) send(vs ...string) (err error) {
|
||||
//chSend <- vs
|
||||
c.wlock.Lock()
|
||||
defer c.wlock.Unlock()
|
||||
@@ -362,13 +365,19 @@ a:
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
time.Sleep(time.Second * 3)
|
||||
// check conn reconnect
|
||||
{
|
||||
c.wlock.Unlock()
|
||||
c.reconn()
|
||||
c.wlock.Lock()
|
||||
}
|
||||
goto a
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) receive() {
|
||||
func (c *ZHubClient) receive() {
|
||||
r := bufio.NewReader(c.conn)
|
||||
for {
|
||||
v, _, err := r.ReadLine()
|
||||
@@ -384,6 +393,7 @@ func (c *Client) receive() {
|
||||
if string(v) == "+ping" {
|
||||
c.send("+pong")
|
||||
}
|
||||
log.Println("receive:", string(v))
|
||||
case '-':
|
||||
log.Println("error:", string(v))
|
||||
case '*':
|
||||
@@ -395,7 +405,8 @@ func (c *Client) receive() {
|
||||
continue
|
||||
}
|
||||
var vs []string
|
||||
for i := 0; i < n; i++ {
|
||||
//for i := 0; i < n; i++ {
|
||||
for len(vs) < n {
|
||||
line, _, err := r.ReadLine()
|
||||
if err != nil || line == nil {
|
||||
continue
|
||||
@@ -415,16 +426,20 @@ func (c *Client) receive() {
|
||||
}
|
||||
vs = append(vs, string(buf))
|
||||
}
|
||||
if len(vs) == 3 && vs[0] == "message" && vs[1] == "lock" {
|
||||
go func() {
|
||||
log.Println("lock:" + vs[2])
|
||||
c.wlock.Lock()
|
||||
defer c.wlock.Unlock()
|
||||
if c.lockFlag[vs[2]] == nil {
|
||||
return
|
||||
}
|
||||
c.lockFlag[vs[2]].flagChan <- 0
|
||||
}()
|
||||
if len(vs) == 3 && vs[0] == "message" {
|
||||
if vs[1] == "lock" {
|
||||
go func() {
|
||||
log.Println("lock:" + vs[2])
|
||||
c.wlock.Lock()
|
||||
defer c.wlock.Unlock()
|
||||
if c.lockFlag[vs[2]] == nil {
|
||||
return
|
||||
}
|
||||
c.lockFlag[vs[2]].flagChan <- 0
|
||||
}()
|
||||
} else {
|
||||
c.chReceive <- vs
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(vs) == 2 && vs[0] == "timer" {
|
||||
|
||||
Reference in New Issue
Block a user