diff --git a/zsub/zsub.go b/zsub/zsub.go index 9f7b400..77d2620 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" "zhub/conf" ) @@ -134,7 +135,7 @@ func (c *ZConn) send(vs ...string) error { } else if len(vs) > 1 { data := "*" + strconv.Itoa(len(vs)) + "\r\n" for _, v := range vs { - data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += "$" + strconv.Itoa(utf8.RuneCountInString(v)) + "\r\n" data += v + "\r\n" } bytes = []byte(data) @@ -249,7 +250,11 @@ func (s *ZSub) acceptHandler(c *ZConn) { case "*": n, _ := strconv.Atoi(string(line[1:])) for i := 0; i < n; i++ { - reader.ReadLine() + line, _, _ := reader.ReadLine() + clen := 0 + if strings.EqualFold("$", string(line[:1])) { + clen, _ = strconv.Atoi(string(line[1:])) + } var vx = "" a: if v, prefix, _ := reader.ReadLine(); prefix { @@ -258,6 +263,11 @@ func (s *ZSub) acceptHandler(c *ZConn) { } else { vx += string(v) } + if clen > utf8.RuneCountInString(vx) { + vx += "\r\n" + goto a + } + rcmd = append(rcmd, vx) } default: diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 8a6cf6b..fbc7c38 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -229,6 +229,9 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { } delay.timer.Reset(time.Duration(t) * time.Millisecond) } else { + if t < 0 { + return + } delay := &ZDelay{ topic: rcmd[1], value: rcmd[2],