diff --git a/x_test.go b/x_test.go index 2616cc8..839a042 100644 --- a/x_test.go +++ b/x_test.go @@ -1,74 +1,25 @@ package main import ( - "encoding/json" - "fmt" "log" "strconv" - "strings" - "sync" "testing" "time" "zhub/cli" ) -type LogInfo2 struct { - RemoteAddr string `json:"remote_addr"` // IP - Time string `json:"time"` // 请求时间 - Status string `json:"status"` // 请求状态 - BodyBytesSent string `json:"body_bytes_sent"` // 返回内容字节 - Host string `json:"host"` // 请求域名 - HttpUserAgent string `json:"http_user_agent"` // 客户端信息 - CostTime string `json:"upstream_response_time"` // 耗时 - - Request string `json:"request"` // - HttpMethod string `json:"http_method"` // 请求类型 - Uri string `json:"uri"` // uri - ProtocolVersion string `json:"protocol_version"` // 请求 - RequestTime string `json:"request_time"` // 时间戳 - HttpCookie string `json:"http_cookie"` -} - func TestName(t *testing.T) { //client, err := cli.Create("39.108.56.246:1216", "") client, err := cli.Create("127.0.0.1:1216", "") if err != nil { log.Fatal(err) } - mutex := sync.Mutex{} - n := 0 - //client.Init() - client.Subscribe("pro-nginx-log", func(v string) { - mutex.Lock() - defer mutex.Unlock() - - if strings.Index(v, "api-oss.woaihaoyouxi.com") > -1 { - return - } - if strings.Index(v, "kibana.woaihaoyouxi.com") > -1 { - return - } - - /*if strings.Index(v, "ef737b680be2cf7868cca99101fa7e66") == -1 { - return - }*/ - - n++ - info, err := logParse(v) - if err != nil { - log.Println("json parse error", v) - return - } - //fmt.Println(strconv.Itoa(n), "接收到主题 pro-nginx-log 消息", v) - t, err := strconv.ParseInt(info.RequestTime, 10, 0) - fmt.Println(strconv.Itoa(n), time.Unix(t, 0).Format("2006-01-02 15:04:05"), info.Status, info.CostTime, info.Uri, info.HttpCookie) - }) client.Subscribe("a-1", func(v string) { log.Println(v) }) - client.Timer("t", "*/3 * * * * *") + client.Timer("t", "* * * * * *") go func() { for i := 0; i < 50000; i++ { @@ -77,59 +28,5 @@ func TestName(t *testing.T) { } }() - //log.Println("send") - //client.Daly("x", "abx", 1000 * 10) time.Sleep(time.Hour * 3) } - -func TestX(t *testing.T) { - strs := [...]string{"1", "2", "3", "4"} - - strss := strs[0:2] - - fmt.Println(strss) -} - -func logParse(str string) (LogInfo2, error) { - defer func() { - if r := recover(); r != nil { - log.Println("nginx.logParse error:", r, str) - } - }() - - var info LogInfo2 - err := json.Unmarshal([]byte(str), &info) - if err != nil { - log.Println("111", err, str) - return LogInfo2{}, err - } - - if !strings.EqualFold(info.Request, "") { - arr := strings.Split(info.Request, " ") - if len(arr) == 3 { - info.HttpMethod = arr[0] - info.Uri = arr[1] - info.ProtocolVersion = arr[2] - } - } - if !strings.EqualFold(info.Request, "") { - arr := strings.Split(info.Request, " ") - if len(arr) == 3 { - info.HttpMethod = arr[0] - info.Uri = arr[1] - info.ProtocolVersion = arr[2] - } - } - - if !strings.EqualFold(info.Time, "") { - t, err := time.Parse(time.RFC3339, info.Time) - if err != nil { - log.Println("127", err, str) - return LogInfo2{}, err - } else { - info.RequestTime = strconv.FormatInt(t.Unix(), 10) - } - } - - return info, nil -} diff --git a/zdb/rcmd-exec.go b/zdb/rcmd-exec.go index 58b9e3c..19c70ae 100644 --- a/zdb/rcmd-exec.go +++ b/zdb/rcmd-exec.go @@ -1,8 +1,6 @@ package zdb import ( - "fmt" - "github.com/robfig/cron" "log" "net" "strconv" @@ -60,55 +58,13 @@ func execCmd(rcmd []string, conn net.Conn) { case "daly": daly(rcmd, conn) case "timer": - timer(rcmd, conn) + Timer(rcmd, conn) default: conn.Write([]byte("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]\r\n")) return } } -func timer(rcmd []string, conn net.Conn) { - ztimer := zTimer[rcmd[1]] - if ztimer == nil { - ztimer = &ZTimer{ - conns: []*net.Conn{}, - topic: rcmd[1], - } - zTimer[rcmd[1]] = ztimer - } - - _conns := make([]*net.Conn, 0) - for _, c := range ztimer.conns { - if *&conn == *c { - continue - } - _conns = append(_conns, c) - } - _conns = append(_conns, &conn) - ztimer.conns = _conns - - if !strings.EqualFold(ztimer.expr, rcmd[2]) { - ztimer.expr = rcmd[2] - if ztimer.cron != nil { - ztimer.cron.Stop() - } - ztimer.cron = func() *cron.Cron { - c := cron.New() - c.AddFunc(ztimer.expr, func() { - fmt.Println(time.Now().Second()) - for _, conn := range ztimer.conns { - send(*conn, "timer", ztimer.topic) - } - }) - go c.Run() - return c - }() - } - - zTimer[ztimer.topic] = ztimer - fmt.Println("xx") -} - // daly topic valye 100 func daly(rcmd []string, conn net.Conn) { if len(rcmd) != 4 { @@ -229,7 +185,7 @@ func publish(rcmd []string, conn net.Conn) { msgs := []string{"message", topic, v} for _, c := range subs { - send(*c.conn, msgs...) + Send(*c.conn, msgs...) /*_conn.Write([]byte("*3\r\n")) for _, msg := range msgs { _conn.Write([]byte("$" + strconv.Itoa(len(msg)) + "\r\n")) @@ -240,7 +196,7 @@ func publish(rcmd []string, conn net.Conn) { var wlock = sync.Mutex{} -func send(conn net.Conn, vs ...string) (err error) { +func Send(conn net.Conn, vs ...string) (err error) { //chSend <- vs wlock.Lock() defer wlock.Unlock() diff --git a/zdb/zdb-server.go b/zdb/zdb-server.go index 8b25aef..0bce4b6 100644 --- a/zdb/zdb-server.go +++ b/zdb/zdb-server.go @@ -3,7 +3,6 @@ package zdb import ( "bufio" "fmt" - "github.com/robfig/cron" "log" "net" "strconv" @@ -16,7 +15,6 @@ var ( zkv = make(map[string]string) zsub = make(map[string][]*ConnContext) // topic -- connx[] retOk = []byte("+OK") - zTimer = make(map[string]*ZTimer) retHelp = []byte( "\n--- zdb help ---\n" + "______ _____ _____ \n|___ / | _ \\ | _ \\ \n / / | | | | | |_| | \n / / | | | | | _ { \n / /__ | |_| | | |_| | \n/_____| |_____/ |_____/ \n" + @@ -49,16 +47,9 @@ type ConnContext struct { createTime time.Time } -type ZTimer struct { - conns []*net.Conn - expr string - topic string - cron *cron.Cron -} - // ====================================================================== -// zdb 服务启动 +// ZHub 服务启动 func ServerStart(host string, port int) { listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil {