diff --git a/cli_test.go b/cli_test.go
deleted file mode 100644
index 359173d..0000000
--- a/cli_test.go
+++ /dev/null
@@ -1,157 +0,0 @@
-package main
-
-import (
- "fmt"
- "log"
- "strconv"
- "testing"
- "time"
- "zhub/cli"
-)
-
-var (
- //addr = "47.111.150.118:6066"
- addr = "127.0.0.1:1216"
- //addr = "122.112.180.156:6066"
- //addr = "39.108.56.246:1216"
-)
-
-func TestCli(t *testing.T) {
- //client, err := cli.Create("39.108.56.246:7070", "")
- client, err := cli.Create(addr, "xx")
- //client, err := cli.Create(addr, "topic-x")
- if err != nil {
- log.Fatal(err)
- }
-
- // 订阅主题 消息
- client.Subscribe("ax", func(v string) {
- log.Println("收到主题 ax 消息 " + v)
- })
-
- // 定时调度
- client.Timer("a", func() {
- log.Println("收到 a 定时消息")
- })
-
- client.Subscribe("a", func(v string) {
- log.Println("收到主题 a 消息 " + v)
- })
- client.Delay("a", "x", 3000)
-
- time.Sleep(time.Hour * 3)
-}
-
-func TestTimer(t *testing.T) {
- go func() {
- client, _ := cli.Create(addr, "topic-1")
-
- client.Subscribe("ax1", func(v string) {
- log.Println("topic-1-ax: " + v)
- })
- }()
- go func() {
- client, _ := cli.Create(addr, "topic-1")
-
- client.Subscribe("ax1", func(v string) {
- log.Println("topic-2-ax: " + v)
- })
- }()
-
- go func() {
- client, _ := cli.Create(addr, "topic-1")
-
- client.Subscribe("ax1", func(v string) {
- log.Println("topic-3-ax: " + v)
- })
- }()
-
- /*go func() {
- client, _ := cli.Create(addr, "topic-1")
-
- client.Subscribe("ax", func(v string) {
- log.Println("topic-4-ax: " + v)
- })
- }()
- go func() {
- client, _ := cli.Create(addr, "topic-1")
-
- client.Subscribe("ax", func(v string) {
- log.Println("topic-5-ax: " + v)
- })
- }()*/
-
- /*go func() {
- client, _ := cli.Create(addr, "topic-2")
- client.Timer("a", func() {
- log.Println("client-2 收到 a 的定时消息")
- })
- }()
-
- go func() {
- client, _ := cli.Create(addr, "topic-3")
- client.Timer("c", func() {
- log.Println("client-2 收到 c 的定时消息")
- })
-
- client.Timer("b", func() {
- log.Println("client-2 收到 b 的定时消息")
- })
- client.Timer("LOAD-LIVE-ROOM-UNBANNED", func() {
- log.Println("client-2 收到 LOAD-LIVE-ROOM-UNBANNED 的定时消息")
- })
- client.Timer("VIP-EXP-EXPIRE", func() {
- log.Println("client-2 收到 VIP-EXP-EXPIRE 的定时消息")
- })
- }()*/
-
- time.Sleep(time.Hour * 3)
-}
-
-func TestSendCmd(t *testing.T) {
- client, err := cli.Create(addr, "group-admin")
- if err != nil {
- log.Println(err)
- }
-
- //client.Cmd("reload-timer")
- client.Cmd("shutdown")
-}
-
-func TestPublish(t *testing.T) {
- client, err := cli.Create(addr, "")
- if err != nil {
- log.Println(err)
- }
- for i := 0; i < 10000; i++ {
- client.Publish("ax1", strconv.Itoa(i))
- }
-
- time.Sleep(time.Second)
-}
-
-func TestLock(t *testing.T) {
- client, _ := cli.Create(addr, "xx")
-
- client.Subscribe("lock", func(v string) {
-
- })
-
- var fun = func(x string) {
- log.Println("lock", time.Now().UnixNano()/1e6)
- lock := client.Lock("a", 30)
- defer client.Unlock(lock)
- //client.Lock("a", 5)
-
- for i := 0; i < 5; i++ {
- time.Sleep(time.Second * 1)
- fmt.Println(x + ":" + strconv.Itoa(i+1))
- }
- }
-
- go fun("x")
- go fun("y")
- go fun("z")
-
- time.Sleep(time.Second * 30 * 10)
-}
diff --git a/cli/client.go b/cmd/client.go
similarity index 99%
rename from cli/client.go
rename to cmd/client.go
index 6397fa3..5923c99 100644
--- a/cli/client.go
+++ b/cmd/client.go
@@ -1,4 +1,4 @@
-package cli
+package cmd
import (
"bufio"
diff --git a/app.go b/main.go
similarity index 76%
rename from app.go
rename to main.go
index 22db86b..b8893b0 100644
--- a/app.go
+++ b/main.go
@@ -5,9 +5,7 @@ import (
"os"
"strings"
"time"
- "zhub/cli"
- "zhub/conf"
- "zhub/monitor"
+ "zhub/cmd"
"zhub/zsub"
)
@@ -25,13 +23,13 @@ func main() {
confPath = arg[3:]
}
}
- conf.Load(confPath)
+ zsub.LoadConf(confPath)
if len(addr) == 0 {
- addr = conf.GetStr("service.zhub.servers", "127.0.0.1:1216")
+ addr = zsub.GetStr("service.zhub.servers", "127.0.0.1:1216")
}
if len(os.Args) == 3 && strings.EqualFold(os.Args[1], "-r") {
- if cli, err := cli.Create(addr, "group-admin"); err != nil {
+ if cli, err := cmd.Create(addr, "group-admin"); err != nil {
log.Println(err)
} else {
switch os.Args[2] {
@@ -47,10 +45,10 @@ func main() {
}
if server {
- go monitor.StartHttp()
+ go zsub.StartHttp()
zsub.ServerStart(addr) // 服务进程启动
} else {
- cli.ClientRun(addr)
+ cmd.ClientRun(addr)
}
}
diff --git a/public/index.html b/public/index.html
index 3978828..4c2078e 100644
--- a/public/index.html
+++ b/public/index.html
@@ -2,9 +2,120 @@
- Title
+ ZHub
+
+
+
-welcome zhub!
+welcome zhub!
+
+
+
Topic: 698
+
Timer: 65
+
Conns: 45
+
Lock: 12/1122
+
Delay: 12/3000
+
+
+
+
+
+
+
+
+
+ Issues
+
+
+ Topic
+
+
+ Timer
+
+
+ Conns
+
+
+ Lock
+
+
+ Delay
+
+
+ Releases
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/conf/config.go b/zsub/config.go
similarity index 97%
rename from conf/config.go
rename to zsub/config.go
index 6219105..326defe 100644
--- a/conf/config.go
+++ b/zsub/config.go
@@ -1,4 +1,4 @@
-package conf
+package zsub
import (
"bufio"
@@ -15,7 +15,7 @@ var (
DataDir = ""
)
-func Load(path string) {
+func LoadConf(path string) {
f, err := os.Open(path)
if err != nil {
log.Panicln(err)
diff --git a/monitor/monitor.go b/zsub/monitor.go
similarity index 85%
rename from monitor/monitor.go
rename to zsub/monitor.go
index 0ac502a..1856c51 100644
--- a/monitor/monitor.go
+++ b/zsub/monitor.go
@@ -1,13 +1,17 @@
-package monitor
+package zsub
import (
"encoding/json"
"net/http"
"os"
"path"
- "zhub/zsub"
)
+func init() {
+ // 1.日志文件 定期分割归档
+
+}
+
func StartHttp() {
dir, _ := os.Getwd()
webDir := path.Join(dir, "/public")
@@ -24,22 +28,23 @@ func StartHttp() {
func publish(w http.ResponseWriter, r *http.Request) {
topic := r.FormValue("topic")
value := r.FormValue("value")
- zsub.ZSubx().Publish(topic, value)
+ zsub.Publish(topic, value)
renderJson(w, "+ok")
}
+// retimer 重载定时调度
func retimer(w http.ResponseWriter, r *http.Request) {
- zsub.ZSubx().ReloadTimer()
+ zsub.ReloadTimer()
renderJson(w, "+reload timer ok")
}
func cleanup(w http.ResponseWriter, r *http.Request) {
- zsub.ZSubx().Clearup()
+ zsub.Clearup()
renderJson(w, "+OK")
}
func info(w http.ResponseWriter, r *http.Request) {
- info := zsub.Info()
+ info := Info()
renderJson(w, info)
}
diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go
index f955388..d5ee320 100644
--- a/zsub/msg-consumer.go
+++ b/zsub/msg-consumer.go
@@ -6,7 +6,6 @@ import (
"strconv"
"strings"
"time"
- "zhub/conf"
)
var funChan = make(chan func(), 1000)
@@ -30,7 +29,7 @@ func msgAccept(v Message) {
return
}
- if conf.LogDebug {
+ if LogDebug {
log.Println("[", v.Conn.sn, "] rcmd: "+strings.Join(rcmd, " "))
}
diff --git a/zsub/zdb.go b/zsub/zdb.go
index 479fc0b..6b9ac1c 100644
--- a/zsub/zdb.go
+++ b/zsub/zdb.go
@@ -10,7 +10,6 @@ import (
"strings"
"sync/atomic"
"time"
- "zhub/conf"
)
var (
@@ -50,11 +49,11 @@ func (s *ZSub) dataStorage() {
s.delayup = false
}()
- err := os.Remove(conf.DataDir + "/delay.z")
+ err := os.Remove(DataDir + "/delay.z")
if err != nil {
log.Println(err)
}
- file, err := os.OpenFile(conf.DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend)
+ file, err := os.OpenFile(DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend)
if err != nil {
fmt.Println(err)
}
@@ -75,7 +74,7 @@ func (s *ZSub) dataStorage() {
// ========================== lock save ===========================
func() {
- err := os.Remove(conf.DataDir + "/lock.z")
+ err := os.Remove(DataDir + "/lock.z")
if err != nil {
log.Println(err)
}
@@ -86,12 +85,12 @@ func (s *ZSub) dataStorage() {
break // 只记录获得锁的记录
}
}
- Append(str, conf.DataDir+"/lock.z")
+ Append(str, DataDir+"/lock.z")
}()
}
func (s *ZSub) loadDelay() {
- f, err := os.Open(conf.DataDir + "/delay.z")
+ f, err := os.Open(DataDir + "/delay.z")
if err != nil {
return
}
@@ -126,7 +125,7 @@ func (s *ZSub) loadDelay() {
}
func (s *ZSub) loadLock() {
- f, err := os.Open(conf.DataDir + "/lock.z")
+ f, err := os.Open(DataDir + "/lock.z")
if err != nil {
return
}
@@ -172,12 +171,12 @@ var (
)
func init() {
- conf.Load("app.conf")
+ LoadConf("app.conf")
_db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
- conf.GetStr("ztimer.db.user", "root"),
- conf.GetStr("ztimer.db.pwd", "123456"),
- conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"),
- conf.GetStr("ztimer.db.database", "zhub"),
+ GetStr("ztimer.db.user", "root"),
+ GetStr("ztimer.db.pwd", "123456"),
+ GetStr("ztimer.db.addr", "127.0.0.1:3306"),
+ GetStr("ztimer.db.database", "zhub"),
))
if err != nil {
log.Println(err)
diff --git a/zsub/zsub.go b/zsub/zsub.go
index 2b9f2dc..57e0541 100644
--- a/zsub/zsub.go
+++ b/zsub/zsub.go
@@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"
"unicode/utf8"
- "zhub/conf"
)
var (
@@ -251,7 +250,7 @@ ServerStart
2、init server
*/
func ServerStart(addr string) {
- conf.GetStr("data.dir", "data")
+ GetStr("data.dir", "data")
go func() {
for {
@@ -523,7 +522,3 @@ func (s *ZSub) noSubscribe(topic string) bool {
}
return true
}
-
-func ZSubx() *ZSub {
- return zsub
-}
diff --git a/zsub/ztimer.go b/zsub/ztimer.go
index 8559e70..43f3ee9 100644
--- a/zsub/ztimer.go
+++ b/zsub/ztimer.go
@@ -13,7 +13,6 @@ import (
"strings"
"text/template"
"time"
- "zhub/conf"
)
type ZTimer struct {
@@ -167,10 +166,10 @@ func executeShell(command string) (string, error, string) {
func (s *ZSub) ReloadTimer() {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
- conf.GetStr("ztimer.db.user", "root"),
- conf.GetStr("ztimer.db.pwd", "123456"),
- conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"),
- conf.GetStr("ztimer.db.database", "zhub"),
+ GetStr("ztimer.db.user", "root"),
+ GetStr("ztimer.db.pwd", "123456"),
+ GetStr("ztimer.db.addr", "127.0.0.1:3306"),
+ GetStr("ztimer.db.database", "zhub"),
))
if err != nil {