commit c0e9fa0c6bf19599ed9074abd2f58b8cb398f4dc Author: lxy <237809796@qq.com> Date: Fri Jan 8 08:19:58 2021 +0000 . git-svn-id: svn://47.119.165.148/zhub@58 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 diff --git a/cli/zdb-client.go b/cli/zdb-client.go new file mode 100644 index 0000000..0cac8d2 --- /dev/null +++ b/cli/zdb-client.go @@ -0,0 +1,385 @@ +package cli + +import ( + "bufio" + "fmt" + "log" + "net" + "os" + "strconv" + "strings" + "sync" + "time" +) + +var ( + reconnect = 0 + subEvent = make(map[string]func(v string)) + chReceive = make(chan []string, 1000) + chSend = make(chan []string, 1000) + timerReceive = make(chan []string, 1000) +) + +type Client struct { + wlock sync.Mutex // 写锁 + rlock sync.Mutex // 读锁 + addr string // host:port + conn net.Conn // socket 连接对象 + createTime time.Time // 创建时间 +} + +func Create(addr string, groupid string) (*Client, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return &Client{}, err + } + + client := Client{ + wlock: sync.Mutex{}, + rlock: sync.Mutex{}, + addr: addr, + conn: conn, + createTime: time.Now(), + } + + conn.Write([]byte("groupid " + groupid + "\r\n")) + client.init() + return &client, err +} + +func (c *Client) reconn() (err error) { + for n := 1; n < 10; n++ { + conn, err := net.Dial("tcp", c.addr) + if err != nil { + log.Println("reconn", err) + time.Sleep(time.Second * 3) + continue + } else if err == nil { + c.conn = conn + go c.receive() + for topic, _ := range subEvent { + c.subscribes(topic) + } + break + } + } + return +} + +func (c *Client) init() { + // 消费 topic 消息 + go func() { + for { + select { + case vs := <-chReceive: + fun := subEvent[vs[1]] + if fun == nil { + log.Println("topic received, nothing to do", vs[1], vs[2]) + continue + } + fun(vs[2]) + case vs := <-timerReceive: + log.Println("收到 timer 消息 ", vs[1]) + } + + } + + /*for { + vs, ok := <-chReceive + if !ok { + break + } + + fun := subEvent[vs[1]] + if fun == nil { + log.Println("topic received, nothing to do", vs[1], vs[2]) + continue + } + fun(vs[2]) + }*/ + }() + + go c.receive() +} + +func (c *Client) Subscribe(topic string, fun func(v string)) { + subEvent[topic] = fun + c.subscribes(topic) +} + +/* +--- +ping +--- +*/ +func (c *Client) ping() { + c.send("ping") +} + +// -------------------------------------- pub-sub -------------------------------------- +/* +发送 主题消息 +--- +*3 +$7 +message +$8 +my-topic +$24 +{username:xx,mobile:xxx} +--- +*/ +func (c *Client) Publish(topic string, message string) error { + c.send("publish", topic, message) + return nil +} + +func (c *Client) Daly(topic string, message string, daly int) error { + c.send("daly", topic, message, strconv.Itoa(daly)) + return nil +} + +func (c *Client) Timer(topic string, expr string) { + c.send("timer", topic, expr) +} + +/* +// 订阅主题消息 +--- +subscribe x y z +--- +*/ +func (c *Client) subscribes(topics ...string) error { + if len(topics) == 0 { + return nil + } + + messages := "subscribe" + for _, topic := range topics { + messages += " " + topic + } + c.send(messages) + return nil +} + +// 发送 socket 消息 +func (c *Client) send(vs ...string) (err error) { + //chSend <- vs + c.wlock.Lock() + defer c.wlock.Unlock() +a: + if len(vs) == 1 { + _, err = c.conn.Write([]byte(vs[0] + "\r\n")) + } else if len(vs) > 1 { + data := "*" + strconv.Itoa(len(vs)) + "\r\n" + for _, v := range vs { + data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += v + "\r\n" + } + _, err = c.conn.Write([]byte(data)) + } + if err != nil { + log.Println(err) + time.Sleep(time.Second * 3) + goto a + } + + return err +} + +func (c *Client) receive() { + c.rlock.Lock() + defer c.rlock.Unlock() + + r := bufio.NewReader(c.conn) + for { + v, _, err := r.ReadLine() + if err != nil { + log.Println("receive error and reconn: ", err) + if err = c.reconn(); err == nil { + r = bufio.NewReader(c.conn) + } else { + + } + time.Sleep(time.Second * 3) + continue + } else if len(v) == 0 { + log.Println("receive empty") + continue + } + + switch string(v[0:1]) { + case "*": // 订阅消息 + // 数据行数 + vlen, err := strconv.Atoi(string(v[1:])) + if err != nil { + log.Println("receive parse len error: ", err, string(v)) + continue + } + + // 读取完整数据 + vs := make([]string, 0) + for i := 0; i < vlen; i++ { + r.ReadLine() // $x + v, _, err = r.ReadLine() + if err != nil { + log.Println("receive parse v error: ", err) + } + vs = append(vs, string(v)) + } + + if len(vs) == 3 && strings.EqualFold(vs[0], "message") { + chReceive <- vs + continue + } + if len(vs) == 2 && strings.EqualFold(vs[0], "timer") { + timerReceive <- vs + continue + } + + continue + case "+": // +pong, +xxx + case "-": + fmt.Println("error:", string(v)) + case ":": + + } + } + +} + +// -------------------------------------- k-v -------------------------------------- +/* +*3 +$3 +set +$n +x +$m +xx +*/ +func (c *Client) set(key string, value interface{}) error { + + return nil +} + +/* + + */ +func (c *Client) get(key string) string { + + return "" +} + +// -------------------------------------- hm -------------------------------------- + +// ============================================================================== + +// client 命令行程序 +func ClientRun(host string, port int) { + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + + for { + if err != nil { + log.Println(err) + time.Sleep(time.Second * 3) + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + continue + } + + fmt.Println(fmt.Sprintf("had connected server: %s:%d", host, port)) + break + } + + defer func() { + if reconnect == 1 { + conn.Close() + ClientRun(host, port) + } + }() + + go clientRead(conn) + + for { + inReader := bufio.NewReader(os.Stdin) + line, err := inReader.ReadString('\n') + if err != nil { + fmt.Println(err) + return + } else if reconnect == 1 { + return + } + + line = strings.Trim(line, "\r\n") + line = strings.Trim(line, "\n") + line = strings.Trim(line, " ") + + if strings.EqualFold(line, "") { + continue + } else if strings.EqualFold(line, ":exit") { + fmt.Println("exit!") + return + } + + //fmt.Println("发送数据:" + line) + + line = strings.ReplaceAll(line, " ", "") + parr := strings.Split(line, " ") + conn.Write([]byte("*" + strconv.Itoa(len(parr)) + "\r\n")) + for i := range parr { + conn.Write([]byte("$" + strconv.Itoa(len(parr[i])) + "\r\n")) + conn.Write([]byte(parr[i] + "\r\n")) + } + } +} + +func clientRead(conn net.Conn) { + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered:", r) + } + reconnect = 1 + }() + + reader := bufio.NewReader(conn) + for { + rcmd := make([]string, 0) + line, _, err := reader.ReadLine() + if err != nil { + log.Println("connection error: ", err) + return + } else if len(line) == 0 { + continue + } + + switch string(line[:1]) { + case "*": + n, _ := strconv.Atoi(string(line[1:])) + for i := 0; i < n; i++ { + reader.ReadLine() + v, _, _ := reader.ReadLine() + rcmd = append(rcmd, string(v)) + } + case "+": + rcmd = append(rcmd, string(line)) + case "-": + rcmd = append(rcmd, string(line)) + case ":": + rcmd = append(rcmd, string(line)) + case "h": + if strings.EqualFold(string(line), "help-start") { + for { + v, _, _ := reader.ReadLine() + if strings.EqualFold(string(v), "help-end") { + break + } + rcmd = append(rcmd, string(v)+"\r\n") + } + } + default: + rcmd = append(rcmd, string(line)) + } + + fmt.Println(">", strings.Join(rcmd, " ")) + } +} diff --git a/cli/zdb-client_test.go b/cli/zdb-client_test.go new file mode 100644 index 0000000..aea9340 --- /dev/null +++ b/cli/zdb-client_test.go @@ -0,0 +1,12 @@ +package cli + +import ( + "testing" + +) + +func TestClient(t *testing.T) { + + + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..04a8cdb --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module zhub + +go 1.15 + +require github.com/robfig/cron v1.2.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b5b4795 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= diff --git a/main.go b/main.go new file mode 100644 index 0000000..59f2222 --- /dev/null +++ b/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "os" + "strconv" + "strings" + "zhub/cli" + "zhub/zdb" +) + +func main() { + server := true + host := "127.0.0.1" + port := 1216 + + for _, arg := range os.Args[1:] { + if strings.EqualFold(arg, "cli") { + server = false + } else if strings.Index(arg, "-h=") == 0 { + host = arg[3:] + } else if strings.Index(arg, "-p=") == 0 { + p, err := strconv.Atoi(arg[3:]) + if err != nil { + fmt.Println("-Error para: -p=[number]") + os.Exit(0) + } + port = p + } + } + + if server { + zdb.ServerStart(host, port) + } else { + cli.ClientRun(host, port) + } + +} diff --git a/pkg.bat b/pkg.bat new file mode 100644 index 0000000..ebe4992 --- /dev/null +++ b/pkg.bat @@ -0,0 +1,4 @@ +SET GOOS=linux +SET GOARCH=amd64 +go build -o zdb.sh -ldflags "-s -w" ./main.go +upx -9 zdb.sh diff --git a/upx.exe b/upx.exe new file mode 100644 index 0000000..436082b Binary files /dev/null and b/upx.exe differ diff --git a/x_test.go b/x_test.go new file mode 100644 index 0000000..2616cc8 --- /dev/null +++ b/x_test.go @@ -0,0 +1,135 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "strconv" + "strings" + "sync" + "testing" + "time" + "zhub/cli" +) + +type LogInfo2 struct { + RemoteAddr string `json:"remote_addr"` // IP + Time string `json:"time"` // 请求时间 + Status string `json:"status"` // 请求状态 + BodyBytesSent string `json:"body_bytes_sent"` // 返回内容字节 + Host string `json:"host"` // 请求域名 + HttpUserAgent string `json:"http_user_agent"` // 客户端信息 + CostTime string `json:"upstream_response_time"` // 耗时 + + Request string `json:"request"` // + HttpMethod string `json:"http_method"` // 请求类型 + Uri string `json:"uri"` // uri + ProtocolVersion string `json:"protocol_version"` // 请求 + RequestTime string `json:"request_time"` // 时间戳 + HttpCookie string `json:"http_cookie"` +} + +func TestName(t *testing.T) { + //client, err := cli.Create("39.108.56.246:1216", "") + client, err := cli.Create("127.0.0.1:1216", "") + if err != nil { + log.Fatal(err) + } + mutex := sync.Mutex{} + n := 0 + //client.Init() + client.Subscribe("pro-nginx-log", func(v string) { + mutex.Lock() + defer mutex.Unlock() + + if strings.Index(v, "api-oss.woaihaoyouxi.com") > -1 { + return + } + if strings.Index(v, "kibana.woaihaoyouxi.com") > -1 { + return + } + + /*if strings.Index(v, "ef737b680be2cf7868cca99101fa7e66") == -1 { + return + }*/ + + n++ + info, err := logParse(v) + if err != nil { + log.Println("json parse error", v) + return + } + //fmt.Println(strconv.Itoa(n), "接收到主题 pro-nginx-log 消息", v) + t, err := strconv.ParseInt(info.RequestTime, 10, 0) + fmt.Println(strconv.Itoa(n), time.Unix(t, 0).Format("2006-01-02 15:04:05"), info.Status, info.CostTime, info.Uri, info.HttpCookie) + }) + + client.Subscribe("a-1", func(v string) { + log.Println(v) + }) + + client.Timer("t", "*/3 * * * * *") + + go func() { + for i := 0; i < 50000; i++ { + client.Publish("a-1", strconv.Itoa(i)) + time.Sleep(time.Second) + } + }() + + //log.Println("send") + //client.Daly("x", "abx", 1000 * 10) + time.Sleep(time.Hour * 3) +} + +func TestX(t *testing.T) { + strs := [...]string{"1", "2", "3", "4"} + + strss := strs[0:2] + + fmt.Println(strss) +} + +func logParse(str string) (LogInfo2, error) { + defer func() { + if r := recover(); r != nil { + log.Println("nginx.logParse error:", r, str) + } + }() + + var info LogInfo2 + err := json.Unmarshal([]byte(str), &info) + if err != nil { + log.Println("111", err, str) + return LogInfo2{}, err + } + + if !strings.EqualFold(info.Request, "") { + arr := strings.Split(info.Request, " ") + if len(arr) == 3 { + info.HttpMethod = arr[0] + info.Uri = arr[1] + info.ProtocolVersion = arr[2] + } + } + if !strings.EqualFold(info.Request, "") { + arr := strings.Split(info.Request, " ") + if len(arr) == 3 { + info.HttpMethod = arr[0] + info.Uri = arr[1] + info.ProtocolVersion = arr[2] + } + } + + if !strings.EqualFold(info.Time, "") { + t, err := time.Parse(time.RFC3339, info.Time) + if err != nil { + log.Println("127", err, str) + return LogInfo2{}, err + } else { + info.RequestTime = strconv.FormatInt(t.Unix(), 10) + } + } + + return info, nil +} diff --git a/zdb/rcmd-exec.go b/zdb/rcmd-exec.go new file mode 100644 index 0000000..58b9e3c --- /dev/null +++ b/zdb/rcmd-exec.go @@ -0,0 +1,260 @@ +package zdb + +import ( + "fmt" + "github.com/robfig/cron" + "log" + "net" + "strconv" + "strings" + "sync" + "time" +) + +func execCmd(rcmd []string, conn net.Conn) { + defer func() { + if r := recover(); r != nil { + log.Println("execCmd Recovered:", r) + } + }() + if len(rcmd) == 0 { + return + } + + log.Println("rcmd: " + strings.Join(rcmd, " ")) + + if len(rcmd) == 1 { + switch strings.ToLower(rcmd[0]) { + case "help": + conn.Write([]byte("help-start\r\n")) + conn.Write(retHelp) + conn.Write([]byte("help-end\r\n")) + return + default: + // subscribe|unsubscribe|daly + if strings.Index(rcmd[0], "subscribe") == 0 || strings.Index(rcmd[0], "unsubscribe") == 0 || strings.Index(rcmd[0], "daly") == 0 { + rcmd = strings.Split(rcmd[0], " ") + } else { + conn.Write([]byte("-Error: not supported! (tips: send help)\r\n")) + return + } + } + } + + cmd := rcmd[0] + switch cmd { + case "decr": + decr(rcmd, conn) + case "incr": + incr(rcmd, conn) + case "get": + get(rcmd, conn) + case "set": + set(rcmd, conn) + case "subscribe": + subscribe(rcmd, conn) + case "unsubscribe": + unsubscribe(rcmd, conn) + case "publish": + publish(rcmd, conn) + case "daly": + daly(rcmd, conn) + case "timer": + timer(rcmd, conn) + default: + conn.Write([]byte("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]\r\n")) + return + } +} + +func timer(rcmd []string, conn net.Conn) { + ztimer := zTimer[rcmd[1]] + if ztimer == nil { + ztimer = &ZTimer{ + conns: []*net.Conn{}, + topic: rcmd[1], + } + zTimer[rcmd[1]] = ztimer + } + + _conns := make([]*net.Conn, 0) + for _, c := range ztimer.conns { + if *&conn == *c { + continue + } + _conns = append(_conns, c) + } + _conns = append(_conns, &conn) + ztimer.conns = _conns + + if !strings.EqualFold(ztimer.expr, rcmd[2]) { + ztimer.expr = rcmd[2] + if ztimer.cron != nil { + ztimer.cron.Stop() + } + ztimer.cron = func() *cron.Cron { + c := cron.New() + c.AddFunc(ztimer.expr, func() { + fmt.Println(time.Now().Second()) + for _, conn := range ztimer.conns { + send(*conn, "timer", ztimer.topic) + } + }) + go c.Run() + return c + }() + } + + zTimer[ztimer.topic] = ztimer + fmt.Println("xx") +} + +// daly topic valye 100 +func daly(rcmd []string, conn net.Conn) { + if len(rcmd) != 4 { + conn.Write([]byte("-Error: subscribe para number!\r\n")) + return + } + + t, err := strconv.ParseInt(rcmd[3], 10, 64) + if err != nil { + conn.Write([]byte("-Error: " + strings.Join(rcmd, " ") + "\r\n")) + return + } + + timer := time.NewTimer(time.Duration(t) * time.Millisecond) + select { + case <-timer.C: + // daly => publish + publish(rcmd[0:3], conn) + } +} + +func decr(rcmd []string, conn net.Conn) { + k := rcmd[1] + v := zkv[k] + if strings.EqualFold(v, "") { + v = "0" + } + _v, err := strconv.Atoi(v) + if err != nil { + conn.Write([]byte("-Error: " + err.Error() + "\r\n")) + } + + v = strconv.Itoa(_v - 1) + zkv[k] = v + conn.Write([]byte(v + "\r\n")) +} + +func incr(rcmd []string, conn net.Conn) { + k := rcmd[1] + v := zkv[k] + if strings.EqualFold(v, "") { + v = "0" + } + _v, err := strconv.Atoi(v) + if err != nil { + conn.Write([]byte("- Error: " + err.Error() + "\r\n")) + } + + v = strconv.Itoa(_v + 1) + zkv[k] = v + conn.Write([]byte(v + "\r\n")) +} + +func get(rcmd []string, conn net.Conn) { + k := rcmd[1] + v := zkv[k] + conn.Write([]byte(v + "\r\n")) +} + +func set(rcmd []string, conn net.Conn) { + if len(rcmd) != 3 { + conn.Write([]byte("-Error: set para number!\r\n")) + return + } + zkv[rcmd[1]] = rcmd[2] + conn.Write([]byte("+OK\r\n")) +} + +func subscribe(rcmd []string, conn net.Conn) { + if len(rcmd) < 2 { + conn.Write([]byte("-Error: subscribe para number!\r\n")) + return + } + + for _, topic := range rcmd[1:] { + conns := zsub[topic] + if conns == nil { + conns = make([]*ConnContext, 0) + } + + zsub[topic] = append(conns, &ConnContext{conn: &conn}) + } +} +func unsubscribe(rcmd []string, conn net.Conn) { + if len(rcmd) < 2 { + conn.Write([]byte("-Error: unsubscribe para number!")) + return + } + + for _, topic := range rcmd[1:] { + conns := zsub[topic] + if conns == nil || len(conns) == 0 { + return + } + _conns := make([]*ConnContext, 0) + for _, c := range conns { + if *c.conn == *&conn { + continue + } + _conns = append(_conns, c) + } + zsub[topic] = _conns + } +} +func publish(rcmd []string, conn net.Conn) { + if len(rcmd) < 3 { + conn.Write([]byte("-Error: publish para number!\r\n")) + return + } + + topic := rcmd[1] + v := rcmd[2] + + subs := zsub[topic] + if subs == nil || len(subs) == 0 { + return + } + + msgs := []string{"message", topic, v} + for _, c := range subs { + send(*c.conn, msgs...) + /*_conn.Write([]byte("*3\r\n")) + for _, msg := range msgs { + _conn.Write([]byte("$" + strconv.Itoa(len(msg)) + "\r\n")) + _conn.Write([]byte(msg + "\r\n")) + }*/ + } +} + +var wlock = sync.Mutex{} + +func send(conn net.Conn, vs ...string) (err error) { + //chSend <- vs + wlock.Lock() + defer wlock.Unlock() + + if len(vs) == 1 { + _, err = conn.Write([]byte(vs[0] + "\r\n")) + } else if len(vs) > 1 { + data := "*" + strconv.Itoa(len(vs)) + "\r\n" + for _, v := range vs { + data += "$" + strconv.Itoa(len(v)) + "\r\n" + data += v + "\r\n" + } + _, err = conn.Write([]byte(data)) + } + + return err +} diff --git a/zdb/zdb-server.go b/zdb/zdb-server.go new file mode 100644 index 0000000..8b25aef --- /dev/null +++ b/zdb/zdb-server.go @@ -0,0 +1,146 @@ +package zdb + +import ( + "bufio" + "fmt" + "github.com/robfig/cron" + "log" + "net" + "strconv" + "time" +) + +// 消息命令处理 chan +var ( + chmsg = make(chan Message, 10000) + zkv = make(map[string]string) + zsub = make(map[string][]*ConnContext) // topic -- connx[] + retOk = []byte("+OK") + zTimer = make(map[string]*ZTimer) + retHelp = []byte( + "\n--- zdb help ---\n" + + "______ _____ _____ \n|___ / | _ \\ | _ \\ \n / / | | | | | |_| | \n / / | | | | | _ { \n / /__ | |_| | | |_| | \n/_____| |_____/ |_____/ \n" + + "had supported command:\n" + + "1. set:\n" + + " eg: set a 1\n" + + "2. get:\n" + + " eg: get a\n" + + "3. subscribe:\n" + + " eg: subscribe x y z\n" + + "4. unsubscribe:\n" + + " eg: unsubscribe x1 y1 z1\n" + + "5. publish:\n" + + " eg: publish x 123\n" + + "6. incr:\n" + + " eg: incr a\n" + + "7. decr:\n" + + " eg: decr a\n" + + "--- zdb help ---\n") +) + +// 数据封装 +type Message struct { + Conn *net.Conn + Rcmd []string +} +type ConnContext struct { + conn *net.Conn + groupId string + createTime time.Time +} + +type ZTimer struct { + conns []*net.Conn + expr string + topic string + cron *cron.Cron +} + +// ====================================================================== + +// zdb 服务启动 +func ServerStart(host string, port int) { + listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + log.Fatal(err) + return + } + log.Printf("zdb started listen on: %s:%d \n", host, port) + + // 启动消息监听处理 + go func() { + for { + v, ok := <-chmsg + if !ok { + break + } + + execCmd(v.Rcmd, *&*v.Conn) + } + }() + + for { + conn, err := listen.Accept() + if err != nil { + log.Println(err) + continue + } + fmt.Println("conn start: ", conn.RemoteAddr()) + + go connHandler(conn) + } +} + +// 连接处理 +func connHandler(conn net.Conn) { + defer func() { + for topic, connx := range zsub { + _conns := make([]*ConnContext, 0) + for t := range connx { + if *connx[t].conn == *&conn { + continue + } + _conns = append(_conns, connx[t]) + } + zsub[topic] = _conns + } + conn.Close() + if r := recover(); r != nil { + log.Println("connHandler Recovered:", r) + } + + fmt.Println("conn end: ", conn.RemoteAddr()) + }() + + reader := bufio.NewReader(conn) + + for { + rcmd := make([]string, 0) + line, _, err := reader.ReadLine() + // fmt.Println("line:", string(line)) todo 可使用第一行用于协议头 + if err != nil { + log.Println(err) + return + } + if len(line) == 0 { + continue + } + switch string(line[:1]) { + case "*": + n, _ := strconv.Atoi(string(line[1:])) + for i := 0; i < n; i++ { + reader.ReadLine() + v, _, _ := reader.ReadLine() + rcmd = append(rcmd, string(v)) + } + default: + rcmd = append(rcmd, string(line)) + } + + if len(rcmd) == 0 { + continue + } + + chmsg <- Message{Conn: &conn, Rcmd: rcmd} + } +} diff --git a/zdb/zdb_test.go b/zdb/zdb_test.go new file mode 100644 index 0000000..7681dc3 --- /dev/null +++ b/zdb/zdb_test.go @@ -0,0 +1,7 @@ +package zdb + +import "testing" + +func TestName(t *testing.T) { + ServerStart("127.0.0.1", 1216) +} diff --git a/zsub/zgroup.go b/zsub/zgroup.go new file mode 100644 index 0000000..037e9d3 --- /dev/null +++ b/zsub/zgroup.go @@ -0,0 +1,33 @@ +package zsub + +import "sync" + +type ZGroup struct { //ZGroup + sync.Mutex + conns []*ZConn + offset int + chMsg chan string // 组消息即时投递 +} + +func createZGroup(c *ZConn) *ZGroup { + zgroup := &ZGroup{ + conns: []*ZConn{}, + chMsg: make(chan string, 100), + } + + // 开启消息推送 + go func() { + for { + msg, ok := <-zgroup.chMsg + if !ok { + break + } + + for _, c := range zgroup.conns { + (*c.conn).Write([]byte(msg)) + zgroup.offset++ + } + } + }() + return zgroup +} diff --git a/zsub/zsub.go b/zsub/zsub.go new file mode 100644 index 0000000..e8089d6 --- /dev/null +++ b/zsub/zsub.go @@ -0,0 +1,95 @@ +package zsub + +import ( + "net" + "sync" +) + +var ( + zsub ZSub +) + +type ZSub struct { + sync.Mutex + topics map[string]*ZTopic +} + + +type ZConn struct { //ZConn + conn *net.Conn + groupid string + topics []string +} + +/* +新增订阅: +1、找到对应主题信息 +2、加入到对应组别;如果是第一次的消费组 offset从当前 mcount 开始 +3、若有待消费消息启动消费 +*/ +func (s ZSub) subscribe(c *ZConn, topic string) { // 新增订阅 zconn{} + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + ztopic = &ZTopic{groups: map[string]*ZGroup{}} + s.topics[topic] = ztopic + } + + zgroup := ztopic.groups[c.groupid] //ZGroup + if zgroup == nil { + zgroup = &ZGroup{conns: []*ZConn{}} + ztopic.groups[c.groupid] = zgroup + } + + _conns := make([]*ZConn, 0) + for _, conn := range zgroup.conns { + if conn == c { + continue + } + _conns = append(_conns, conn) + } + _conns = append(_conns, c) + zgroup.conns = _conns +} + +/* +取消订阅: +*/ +func (s ZSub) unsubscribe(c *ZConn, topic string) { // 取消订阅 zconn{} + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + + zgroup := ztopic.groups[c.groupid] //ZGroup + if zgroup == nil { + return + } + + _conns := make([]*ZConn, 0) + for _, conn := range zgroup.conns { + if conn == c { + continue + } + _conns = append(_conns, c) + } + zgroup.conns = _conns +} + +/* +发送主题消息 +1、写入主题消息列表(zdb) +2、回复消息写入成功 +3、推送主题消息 +*/ +func (s ZSub) publish(topic string, message string) { + s.Lock() + defer s.Unlock() + ztopic := s.topics[topic] //ZTopic + if ztopic == nil { + return + } + + for _, zgroup := range ztopic.groups { + zgroup.chMsg <- message + } +} diff --git a/zsub/zsub_test.go b/zsub/zsub_test.go new file mode 100644 index 0000000..bcdad18 --- /dev/null +++ b/zsub/zsub_test.go @@ -0,0 +1,36 @@ +package zsub + +import ( + "fmt" + "testing" +) + +func TestName(t *testing.T) { + sub := ZSub{ + topics: map[string]*ZTopic{}, + } + + sub.subscribe(&ZConn{ + groupid: "a", + }, "ab") + + sub.subscribe(&ZConn{ + groupid: "b", + }, "ab") + + // ----------------- + + sub.subscribe(&ZConn{ + groupid: "b", + }, "abx") + + conn := ZConn{ + groupid: "a", + } + + sub.subscribe(&conn, "abx") + + sub.unsubscribe(&conn, "abx") + + fmt.Println(1) +} diff --git a/zsub/ztopic.go b/zsub/ztopic.go new file mode 100644 index 0000000..a1e6aa0 --- /dev/null +++ b/zsub/ztopic.go @@ -0,0 +1,14 @@ +package zsub + +import "sync" + +type ZTopic struct { //ZTopic + sync.Mutex + groups map[string]*ZGroup + mcount int + chMsg chan string // 主题消息投递 +} + +func createZTopic() { + +} diff --git a/ztimer/ztimer.go b/ztimer/ztimer.go new file mode 100644 index 0000000..8ab46b2 --- /dev/null +++ b/ztimer/ztimer.go @@ -0,0 +1,3 @@ +package ztimer + +