From b5e55c9869a9c1bfa8ee943e9fc78d862d3f6258 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 2 Sep 2021 17:10:21 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A1=E3=80=81topic=20?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=88=B0DB=202=E3=80=81http=20=E5=8F=91?= =?UTF-8?q?=E5=B8=83=E4=BA=8B=E4=BB=B6=E6=94=AF=E6=8C=81=203=E3=80=81?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BB=B6=E6=97=B6=E4=BA=8B=E4=BB=B6=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E5=BC=82=E5=B8=B8=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: svn://47.119.165.148/zhub@127 e63fbceb-bcc3-4977-ac22-735b83d8d0f4 --- monitor/monitor.go | 8 +++ zsub/msg-consumer.go | 5 +- zsub/zdb.go | 141 ++++++++++++++++++++++++++++++++++--------- zsub/zsub.go | 28 +++++---- zsub/ztimer.go | 9 ++- 5 files changed, 148 insertions(+), 43 deletions(-) diff --git a/monitor/monitor.go b/monitor/monitor.go index 5d3fbdc..0ac502a 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -16,10 +16,18 @@ func StartHttp() { http.HandleFunc("/info", info) http.HandleFunc("/cleanup", cleanup) http.HandleFunc("/retimer", retimer) + http.HandleFunc("/topic/publish", publish) http.ListenAndServe(":1217", nil) } +func publish(w http.ResponseWriter, r *http.Request) { + topic := r.FormValue("topic") + value := r.FormValue("value") + zsub.ZSubx().Publish(topic, value) + renderJson(w, "+ok") +} + func retimer(w http.ResponseWriter, r *http.Request) { zsub.ZSubx().ReloadTimer() renderJson(w, "+reload timer ok") diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index c0d53f7..414cd1b 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -65,7 +65,10 @@ func msgAccept(v Message) { if len(rcmd) != 3 { c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") } else { - zsub.publish(rcmd[1], rcmd[2]) + if len(topicChan) < cap(topicChan) { + topicChan <- rcmd + } + zsub.Publish(rcmd[1], rcmd[2]) } return default: diff --git a/zsub/zdb.go b/zsub/zdb.go index 2e43c59..479fc0b 100644 --- a/zsub/zdb.go +++ b/zsub/zdb.go @@ -2,26 +2,28 @@ package zsub import ( "bufio" + "database/sql" "fmt" "log" "os" "strconv" "strings" + "sync/atomic" "time" "zhub/conf" ) var ( -// hubChan = make(chan Message, 1000) //接收到的 所有消息数据 + topicChan = make(chan []string, 1000) //接收到的 所有消息数据 ) -// 数据封装 +// Message 数据封装 type Message struct { Conn *ZConn Rcmd []string } -// 文件追加内容 +// Append file append func Append(str string, fileName string) { file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModeAppend) if err != nil { @@ -39,31 +41,53 @@ func Append(str string, fileName string) { func (s *ZSub) dataStorage() { s.Lock() defer s.Unlock() - // delay save - err := os.Remove(conf.DataDir + "/delay.z") - if err != nil { - log.Println(err) - } - - var str string - for _, delay := range s.delays { - str += fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10)) - } - Append(str, conf.DataDir+"/delay.z") - - // lock save - err = os.Remove(conf.DataDir + "/lock.z") - if err != nil { - log.Println(err) - } - str = "" - for _, locks := range s.locks { - for _, lock := range locks { - str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start) - break // 只记录获得锁的记录 + // ========================== delay save =========================== + func() { + if !s.delayup { + return } - } - Append(str, conf.DataDir+"/lock.z") + defer func() { + s.delayup = false + }() + + err := os.Remove(conf.DataDir + "/delay.z") + if err != nil { + log.Println(err) + } + file, err := os.OpenFile(conf.DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend) + if err != nil { + fmt.Println(err) + } + defer file.Close() + writer := bufio.NewWriter(file) + delays2 := s.delays + + for _, delay := range delays2 { + writer.WriteString(delay.topic) + writer.WriteString(" ") + writer.WriteString(delay.value) + writer.WriteString(" ") + writer.WriteString(strconv.FormatInt(delay.exectime.Unix(), 10)) + writer.WriteString("\n") + } + writer.Flush() + }() + + // ========================== lock save =========================== + func() { + err := os.Remove(conf.DataDir + "/lock.z") + if err != nil { + log.Println(err) + } + str := "" + for _, locks := range s.locks { + for _, lock := range locks { + str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start) + break // 只记录获得锁的记录 + } + } + Append(str, conf.DataDir+"/lock.z") + }() } func (s *ZSub) loadDelay() { @@ -140,3 +164,66 @@ func (s *ZSub) loadLock() { }) } } + +// -------------------------------------- +var ( + db *sql.DB + seq int64 = 50000 +) + +func init() { + conf.Load("app.conf") + _db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", + conf.GetStr("ztimer.db.user", "root"), + conf.GetStr("ztimer.db.pwd", "123456"), + conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"), + conf.GetStr("ztimer.db.database", "zhub"), + )) + if err != nil { + log.Println(err) + return + } + + db = _db + + // 批量写入数据库,等待超时5秒,如有数据写入数据 + go func() { + defer func() { + if r := recover(); r != nil { + log.Println("MsgToDb Recovered:", r) + } + }() + + var flagcount = 0 + var _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" + for { + select { + case msg := <-topicChan: + var topic, value = msg[1], msg[2] + var t = time.Now().UnixNano() / 1e6 + _sql += fmt.Sprintf("('%s','%s','%s',%d),\n", + strconv.FormatInt(t, 36)+"-"+strconv.FormatInt(atomic.AddInt64(&seq, 1), 36), topic, value, t) + flagcount++ + case <-time.After(time.Second * 5): // 等待5秒 + if flagcount > 0 { + flagcount = 100 + } + } + + if flagcount != 100 { + continue + } + + _sql = _sql[:len(_sql)-2] + _sql += ";" + + _, err = db.Exec(_sql) + if err != nil { + log.Println(err) + } + + _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" + flagcount = 0 + } + }() +} diff --git a/zsub/zsub.go b/zsub/zsub.go index 7e841a4..e2e49a4 100644 --- a/zsub/zsub.go +++ b/zsub/zsub.go @@ -28,7 +28,7 @@ var ( func init() { // conn health check: T=10s, close>29s go func() { - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Second * 20) defer ticker.Stop() for range ticker.C { @@ -60,17 +60,20 @@ func init() { } } + + zsub.dataStorage() } }() } type ZSub struct { sync.RWMutex - topics map[string]*ZTopic - timers map[string]*ZTimer - delays map[string]*ZDelay - locks map[string][]*Lock - conns []*ZConn + topics map[string]*ZTopic + timers map[string]*ZTimer + delays map[string]*ZDelay + locks map[string][]*Lock + conns []*ZConn + delayup bool } type ZConn struct { //ZConn @@ -242,8 +245,9 @@ func (c *ZConn) removeTo(arr []*ZConn) []*ZConn { // ServerStart ================== ZHub server ===================================== /* -1、初始化服务 -2、启动服务监听 +ServerStart +1、load history data +2、init server */ func ServerStart(addr string) { conf.GetStr("data.dir", "data") @@ -354,11 +358,11 @@ func (s *ZSub) acceptHandler(c *ZConn) { } /* -accept stop message -1、send message to stop's chan -2、feedback send success to sender, and sending message to stop's subscripts +Publish topic message +1、send message to topic's chan +2、feedback send success to sender, and sending message to topic's subscripts */ -func (s *ZSub) publish(topic, msg string) { +func (s *ZSub) Publish(topic, msg string) { s.RLock() defer s.RUnlock() ztopic := s.topics[topic] //ZTopic diff --git a/zsub/ztimer.go b/zsub/ztimer.go index fbc7c38..8559e70 100644 --- a/zsub/ztimer.go +++ b/zsub/ztimer.go @@ -207,7 +207,8 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { s.Lock() defer func() { s.Unlock() - s.dataStorage() + // s.dataStorage() + s.delayup = true }() if len(rcmd) != 4 { c.send("-Error: subscribe para number!") @@ -242,8 +243,10 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) { go func() { select { case <-delay.timer.C: - zsub.publish(rcmd[1], rcmd[2]) - delete(s.delays, rcmd[1]+"-"+rcmd[2]) + zsub.Publish(rcmd[1], rcmd[2]) + funChan <- func() { + delete(s.delays, rcmd[1]+"-"+rcmd[2]) + } } }() }