git-svn-id: svn://47.119.165.148/zhub@74 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-01-19 01:41:52 +00:00
parent 82155ca56e
commit 03a7118598
4 changed files with 12 additions and 15 deletions

View File

@@ -2,10 +2,8 @@ package zsub
import ( import (
"log" "log"
"net"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"zhub/conf" "zhub/conf"
) )
@@ -44,7 +42,7 @@ func msgAccept(v Message) {
if startWithAny(rcmd[0], arr...) { if startWithAny(rcmd[0], arr...) {
rcmd = strings.Split(rcmd[0], " ") rcmd = strings.Split(rcmd[0], " ")
} else { } else {
send(c.conn, "-Error: not supported:"+rcmd[0]) c.send("-Error: not supported:" + rcmd[0])
return return
} }
} }
@@ -65,7 +63,7 @@ func msgAccept(v Message) {
} }
case "publish": case "publish":
if len(rcmd) != 3 { if len(rcmd) != 3 {
send(c.conn, "-Error: publish para number!") c.send("-Error: publish para number!")
} else { } else {
zsub.publish(rcmd[1], rcmd[2]) zsub.publish(rcmd[1], rcmd[2])
} }
@@ -82,7 +80,7 @@ func msgAccept(v Message) {
zsub.reloadTimerConfig() zsub.reloadTimerConfig()
} }
default: default:
send(c.conn, "-Error: default not supported:["+strings.Join(rcmd, " ")+"]") c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
return return
} }
} }
@@ -90,13 +88,13 @@ func msgAccept(v Message) {
// daly topic value 100 -> publish topic value // daly topic value 100 -> publish topic value
func daly(rcmd []string, c *ZConn) { func daly(rcmd []string, c *ZConn) {
if len(rcmd) != 4 { if len(rcmd) != 4 {
send(c.conn, "-Error: subscribe para number!") c.send("-Error: subscribe para number!")
return return
} }
t, err := strconv.ParseInt(rcmd[3], 10, 64) t, err := strconv.ParseInt(rcmd[3], 10, 64)
if err != nil { if err != nil {
send(c.conn, "-Error: "+strings.Join(rcmd, " ")) c.send("-Error: " + strings.Join(rcmd, " "))
return return
} }
@@ -107,12 +105,10 @@ func daly(rcmd []string, c *ZConn) {
} }
} }
var wlock = sync.Mutex{}
// send message // send message
func send(conn *net.Conn, vs ...string) error { func (c *ZConn) send(vs ...string) error {
wlock.Lock() c.Lock()
defer wlock.Unlock() defer c.Unlock()
var bytes []byte var bytes []byte
@@ -126,6 +122,6 @@ func send(conn *net.Conn, vs ...string) error {
} }
bytes = []byte(data) bytes = []byte(data)
} }
_, err := (*conn).Write(bytes) _, err := (*c.conn).Write(bytes)
return err return err
} }

View File

@@ -21,7 +21,7 @@ func (g *ZGroup) init() {
if len(g.conns) == 0 { if len(g.conns) == 0 {
continue continue
} }
send(g.conns[0].conn, "message", g.ztopic.topic, msg) g.conns[0].send("message", g.ztopic.topic, msg)
g.offset++ g.offset++
} }
}() }()

View File

@@ -23,6 +23,7 @@ type ZSub struct {
} }
type ZConn struct { //ZConn type ZConn struct { //ZConn
sync.Mutex
conn *net.Conn conn *net.Conn
groupid string groupid string
topics []string topics []string

View File

@@ -54,7 +54,7 @@ func (s *ZSub) timer(rcmd []string, c *ZConn) {
var timerFun = func() { var timerFun = func() {
for _, conn := range timer.conns { for _, conn := range timer.conns {
err := send(conn.conn, "timer", timer.topic) err := conn.send("timer", timer.topic)
if timer.single && err == nil { if timer.single && err == nil {
break break
} }