From 7ca71eb838f64ebdf21c2ed9bfc681bfc63c9fde Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 4 Nov 2021 10:57:50 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E5=B7=A5=E7=A8=8B=E5=8C=85=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@132 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- cli_test.go | 157 ----------------------------------- {cli => cmd}/client.go | 2 +- app.go => main.go | 14 ++-- public/index.html | 115 ++++++++++++++++++++++++- {conf => zsub}/config.go | 4 +- {monitor => zsub}/monitor.go | 17 ++-- zsub/msg-consumer.go | 3 +- zsub/zdb.go | 23 +++-- zsub/zsub.go | 7 +- zsub/ztimer.go | 9 +- 10 files changed, 150 insertions(+), 201 deletions(-) delete mode 100644 cli_test.go rename {cli => cmd}/client.go (99%) rename app.go => main.go (76%) rename {conf => zsub}/config.go (97%) rename {monitor => zsub}/monitor.go (85%) diff --git a/cli_test.go b/cli_test.go deleted file mode 100644 index 359173d..0000000 --- a/cli_test.go +++ /dev/null @@ -1,157 +0,0 @@ -package main - -import ( - "fmt" - "log" - "strconv" - "testing" - "time" - "zhub/cli" -) - -var ( - //addr = "47.111.150.118:6066" - addr = "127.0.0.1:1216" - //addr = "122.112.180.156:6066" - //addr = "39.108.56.246:1216" -) - -func TestCli(t *testing.T) { - //client, err := cli.Create("39.108.56.246:7070", "") - client, err := cli.Create(addr, "xx") - //client, err := cli.Create(addr, "topic-x") - if err != nil { - log.Fatal(err) - } - - // 订阅主题 消息 - client.Subscribe("ax", func(v string) { - log.Println("收到主题 ax 消息 " + v) - }) - - // 定时调度 - client.Timer("a", func() { - log.Println("收到 a 定时消息") - }) - - client.Subscribe("a", func(v string) { - log.Println("收到主题 a 消息 " + v) - }) - client.Delay("a", "x", 3000) - - time.Sleep(time.Hour * 3) -} - -func TestTimer(t *testing.T) { - go func() { - client, _ := cli.Create(addr, "topic-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-1-ax: " + v) - }) - }() - go func() { - client, _ := cli.Create(addr, "topic-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-2-ax: " + v) - }) - }() - - go func() { - client, _ := cli.Create(addr, "topic-1") - - client.Subscribe("ax1", func(v string) { - log.Println("topic-3-ax: " + v) - }) - }() - - /*go func() { - client, _ := cli.Create(addr, "topic-1") - - client.Subscribe("ax", func(v string) { - log.Println("topic-4-ax: " + v) - }) - }() - go func() { - client, _ := cli.Create(addr, "topic-1") - - client.Subscribe("ax", func(v string) { - log.Println("topic-5-ax: " + v) - }) - }()*/ - - /*go func() { - client, _ := cli.Create(addr, "topic-2") - client.Timer("a", func() { - log.Println("client-2 收到 a 的定时消息") - }) - }() - - go func() { - client, _ := cli.Create(addr, "topic-3") - client.Timer("c", func() { - log.Println("client-2 收到 c 的定时消息") - }) - - client.Timer("b", func() { - log.Println("client-2 收到 b 的定时消息") - }) - client.Timer("LOAD-LIVE-ROOM-UNBANNED", func() { - log.Println("client-2 收到 LOAD-LIVE-ROOM-UNBANNED 的定时消息") - }) - client.Timer("VIP-EXP-EXPIRE", func() { - log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息") - }) - }()*/ - - time.Sleep(time.Hour * 3) -} - -func TestSendCmd(t *testing.T) { - client, err := cli.Create(addr, "group-admin") - if err != nil { - log.Println(err) - } - - //client.Cmd("reload-timer") - client.Cmd("shutdown") -} - -func TestPublish(t *testing.T) { - client, err := cli.Create(addr, "") - if err != nil { - log.Println(err) - } - for i := 0; i < 10000; i++ { - client.Publish("ax1", strconv.Itoa(i)) - } - - time.Sleep(time.Second) -} - -func TestLock(t *testing.T) { - client, _ := cli.Create(addr, "xx") - - client.Subscribe("lock", func(v string) { - - }) - - var fun = func(x string) { - log.Println("lock", time.Now().UnixNano()/1e6) - lock := client.Lock("a", 30) - defer client.Unlock(lock) - //client.Lock("a", 5) - - for i := 0; i < 5; i++ { - time.Sleep(time.Second * 1) - fmt.Println(x + ":" + strconv.Itoa(i+1)) - } - } - - go fun("x") - go fun("y") - go fun("z") - - time.Sleep(time.Second * 30 * 10) -} diff --git a/cli/client.go b/cmd/client.go similarity index 99% rename from cli/client.go rename to cmd/client.go index 6397fa3..5923c99 100644 --- a/cli/client.go +++ b/cmd/client.go @@ -1,4 +1,4 @@ -package cli +package cmd import ( "bufio" diff --git a/app.go b/main.go similarity index 76% rename from app.go rename to main.go index 22db86b..b8893b0 100644 --- a/app.go +++ b/main.go @@ -5,9 +5,7 @@ import ( "os" "strings" "time" - "zhub/cli" - "zhub/conf" - "zhub/monitor" + "zhub/cmd" "zhub/zsub" ) @@ -25,13 +23,13 @@ func main() { confPath = arg[3:] } } - conf.Load(confPath) + zsub.LoadConf(confPath) if len(addr) == 0 { - addr = conf.GetStr("service.zhub.servers", "127.0.0.1:1216") + addr = zsub.GetStr("service.zhub.servers", "127.0.0.1:1216") } if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") { - if cli, err := cli.Create(addr, "group-admin"); err != nil { + if cli, err := cmd.Create(addr, "group-admin"); err != nil { log.Println(err) } else { switch os.Args[2] { @@ -47,10 +45,10 @@ func main() { } if server { - go monitor.StartHttp() + go zsub.StartHttp() zsub.ServerStart(addr) // 服务进程启动 } else { - cli.ClientRun(addr) + cmd.ClientRun(addr) } } diff --git a/public/index.html b/public/index.html index 3978828..4c2078e 100644 --- a/public/index.html +++ b/public/index.html @@ -2,9 +2,120 @@ - Title + ZHub + + + -

welcome zhub!

+

welcome zhub!

+ +
+ Topic: 698 + Timer: 65 + Conns: 45 + Lock: 12/1122 + Delay: 12/3000 + +
+
+ +
+ + +
+
+ Issues +
+
+ Topic +
+
+ Timer +
+
+ Conns +
+
+ Lock +
+
+ Delay +
+
+ Releases +
+
+
+ + +
+
+ +
+ +
+ ZHub +
+ Dev-Kit | + JKit | + Meta-Kit | + Red-timer | + Redbbs | + JFly +
+
+ © TCCN 京ICP备16065761号 +
+
+
+ + + \ No newline at end of file diff --git a/conf/config.go b/zsub/config.go similarity index 97% rename from conf/config.go rename to zsub/config.go index 6219105..326defe 100644 --- a/conf/config.go +++ b/zsub/config.go @@ -1,4 +1,4 @@ -package conf +package zsub import ( "bufio" @@ -15,7 +15,7 @@ var ( DataDir = "" ) -func Load(path string) { +func LoadConf(path string) { f, err := os.Open(path) if err != nil { log.Panicln(err) diff --git a/monitor/monitor.go b/zsub/monitor.go similarity index 85% rename from monitor/monitor.go rename to zsub/monitor.go index 0ac502a..1856c51 100644 --- a/monitor/monitor.go +++ b/zsub/monitor.go @@ -1,13 +1,17 @@ -package monitor +package zsub import ( "encoding/json" "net/http" "os" "path" - "zhub/zsub" ) +func init() { + // 1.日志文件 定期分割归档 + +} + func StartHttp() { dir, _ := os.Getwd() webDir := path.Join(dir, "/public") @@ -24,22 +28,23 @@ func StartHttp() { func publish(w http.ResponseWriter, r *http.Request) { topic := r.FormValue("topic") value := r.FormValue("value") - zsub.ZSubx().Publish(topic, value) + zsub.Publish(topic, value) renderJson(w, "+ok") } +// retimer 重载定时调度 func retimer(w http.ResponseWriter, r *http.Request) { - zsub.ZSubx().ReloadTimer() + zsub.ReloadTimer() renderJson(w, "+reload timer ok") } func cleanup(w http.ResponseWriter, r *http.Request) { - zsub.ZSubx().Clearup() + zsub.Clearup() renderJson(w, "+OK") } func info(w http.ResponseWriter, r *http.Request) { - info := zsub.Info() + info := Info() renderJson(w, info) } diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index f955388..d5ee320 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" "time" - "zhub/conf" ) var funChan = make(chan func(), 1000) @@ -30,7 +29,7 @@ func msgAccept(v Message) { return } - if conf.LogDebug { + if LogDebug { log.Println("[", v.Conn.sn, "] rcmd: "+strings.Join(rcmd, " ")) } diff --git a/zsub/zdb.go b/zsub/zdb.go index 479fc0b..6b9ac1c 100644 --- a/zsub/zdb.go +++ b/zsub/zdb.go @@ -10,7 +10,6 @@ import ( "strings" "sync/atomic" "time" - "zhub/conf" ) var ( @@ -50,11 +49,11 @@ func (s *ZSub) dataStorage() { s.delayup = false }() - err := os.Remove(conf.DataDir + "/delay.z") + err := os.Remove(DataDir + "/delay.z") if err != nil { log.Println(err) } - file, err := os.OpenFile(conf.DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend) + file, err := os.OpenFile(DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend) if err != nil { fmt.Println(err) } @@ -75,7 +74,7 @@ func (s *ZSub) dataStorage() { // ========================== lock save =========================== func() { - err := os.Remove(conf.DataDir + "/lock.z") + err := os.Remove(DataDir + "/lock.z") if err != nil { log.Println(err) } @@ -86,12 +85,12 @@ func (s *ZSub) dataStorage() { break // 只记录获得锁的记录 } } - Append(str, conf.DataDir+"/lock.z") + Append(str, DataDir+"/lock.z") }() } func (s *ZSub) loadDelay() { - f, err := os.Open(conf.DataDir + "/delay.z") + f, err := os.Open(DataDir + "/delay.z") if err != nil { return } @@ -126,7 +125,7 @@ func (s *ZSub) loadDelay() { } func (s *ZSub) loadLock() { - f, err := os.Open(conf.DataDir + "/lock.z") + f, err := os.Open(DataDir + "/lock.z") if err != nil { return } @@ -172,12 +171,12 @@ var ( ) func init() { - conf.Load("app.conf") + LoadConf("app.conf") _db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", - conf.GetStr("ztimer.db.user", "root"), - conf.GetStr("ztimer.db.pwd", "123456"), - conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"), - conf.GetStr("ztimer.db.database", "zhub"), + GetStr("ztimer.db.user", "root"), + GetStr("ztimer.db.pwd", "123456"), + GetStr("ztimer.db.addr", "127.0.0.1:3306"), + GetStr("ztimer.db.database", "zhub"), )) if err != nil { log.Println(err) diff --git a/zsub/zsub.go b/zsub/zsub.go index 2b9f2dc..57e0541 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "time" "unicode/utf8" - "zhub/conf" ) var ( @@ -251,7 +250,7 @@ ServerStart 2、init server */ func ServerStart(addr string) { - conf.GetStr("data.dir", "data") + GetStr("data.dir", "data") go func() { for { @@ -523,7 +522,3 @@ func (s *ZSub) noSubscribe(topic string) bool { } return true } - -func ZSubx() *ZSub { - return zsub -} diff --git a/zsub/ztimer.go b/zsub/ztimer.go index 8559e70..43f3ee9 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -13,7 +13,6 @@ import ( "strings" "text/template" "time" - "zhub/conf" ) type ZTimer struct { @@ -167,10 +166,10 @@ func executeShell(command string) (string, error, string) { func (s *ZSub) ReloadTimer() { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", - conf.GetStr("ztimer.db.user", "root"), - conf.GetStr("ztimer.db.pwd", "123456"), - conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"), - conf.GetStr("ztimer.db.database", "zhub"), + GetStr("ztimer.db.user", "root"), + GetStr("ztimer.db.pwd", "123456"), + GetStr("ztimer.db.addr", "127.0.0.1:3306"), + GetStr("ztimer.db.database", "zhub"), )) if err != nil {