新增:1、topic 记录到DB 2、http 发布事件支持 3、修复延时事件并发异常崩溃

git-svn-id: svn://47.119.165.148/zhub@127 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
This commit is contained in:
lxy
2021-09-02 17:10:21 +00:00
parent 15a73136cc
commit b5e55c9869
5 changed files with 148 additions and 43 deletions

View File

@@ -16,10 +16,18 @@ func StartHttp() {
http.HandleFunc("/info", info) http.HandleFunc("/info", info)
http.HandleFunc("/cleanup", cleanup) http.HandleFunc("/cleanup", cleanup)
http.HandleFunc("/retimer", retimer) http.HandleFunc("/retimer", retimer)
http.HandleFunc("/topic/publish", publish)
http.ListenAndServe(":1217", nil) http.ListenAndServe(":1217", nil)
} }
func publish(w http.ResponseWriter, r *http.Request) {
topic := r.FormValue("topic")
value := r.FormValue("value")
zsub.ZSubx().Publish(topic, value)
renderJson(w, "+ok")
}
func retimer(w http.ResponseWriter, r *http.Request) { func retimer(w http.ResponseWriter, r *http.Request) {
zsub.ZSubx().ReloadTimer() zsub.ZSubx().ReloadTimer()
renderJson(w, "+reload timer ok") renderJson(w, "+reload timer ok")

View File

@@ -65,7 +65,10 @@ func msgAccept(v Message) {
if len(rcmd) != 3 { if len(rcmd) != 3 {
c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]")
} else { } else {
zsub.publish(rcmd[1], rcmd[2]) if len(topicChan) < cap(topicChan) {
topicChan <- rcmd
}
zsub.Publish(rcmd[1], rcmd[2])
} }
return return
default: default:

View File

@@ -2,26 +2,28 @@ package zsub
import ( import (
"bufio" "bufio"
"database/sql"
"fmt" "fmt"
"log" "log"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"zhub/conf" "zhub/conf"
) )
var ( var (
// hubChan = make(chan Message, 1000) //接收到的 所有消息数据 topicChan = make(chan []string, 1000) //接收到的 所有消息数据
) )
// 数据封装 // Message 数据封装
type Message struct { type Message struct {
Conn *ZConn Conn *ZConn
Rcmd []string Rcmd []string
} }
// 文件追加内容 // Append file append
func Append(str string, fileName string) { func Append(str string, fileName string) {
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModeAppend) file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModeAppend)
if err != nil { if err != nil {
@@ -39,31 +41,53 @@ func Append(str string, fileName string) {
func (s *ZSub) dataStorage() { func (s *ZSub) dataStorage() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
// delay save // ========================== delay save ===========================
err := os.Remove(conf.DataDir + "/delay.z") func() {
if err != nil { if !s.delayup {
log.Println(err) return
}
var str string
for _, delay := range s.delays {
str += fmt.Sprintf("%s %s %s\n", delay.topic, delay.value, strconv.FormatInt(delay.exectime.Unix(), 10))
}
Append(str, conf.DataDir+"/delay.z")
// lock save
err = os.Remove(conf.DataDir + "/lock.z")
if err != nil {
log.Println(err)
}
str = ""
for _, locks := range s.locks {
for _, lock := range locks {
str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start)
break // 只记录获得锁的记录
} }
} defer func() {
Append(str, conf.DataDir+"/lock.z") s.delayup = false
}()
err := os.Remove(conf.DataDir + "/delay.z")
if err != nil {
log.Println(err)
}
file, err := os.OpenFile(conf.DataDir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend)
if err != nil {
fmt.Println(err)
}
defer file.Close()
writer := bufio.NewWriter(file)
delays2 := s.delays
for _, delay := range delays2 {
writer.WriteString(delay.topic)
writer.WriteString(" ")
writer.WriteString(delay.value)
writer.WriteString(" ")
writer.WriteString(strconv.FormatInt(delay.exectime.Unix(), 10))
writer.WriteString("\n")
}
writer.Flush()
}()
// ========================== lock save ===========================
func() {
err := os.Remove(conf.DataDir + "/lock.z")
if err != nil {
log.Println(err)
}
str := ""
for _, locks := range s.locks {
for _, lock := range locks {
str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start)
break // 只记录获得锁的记录
}
}
Append(str, conf.DataDir+"/lock.z")
}()
} }
func (s *ZSub) loadDelay() { func (s *ZSub) loadDelay() {
@@ -140,3 +164,66 @@ func (s *ZSub) loadLock() {
}) })
} }
} }
// --------------------------------------
var (
db *sql.DB
seq int64 = 50000
)
func init() {
conf.Load("app.conf")
_db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
conf.GetStr("ztimer.db.user", "root"),
conf.GetStr("ztimer.db.pwd", "123456"),
conf.GetStr("ztimer.db.addr", "127.0.0.1:3306"),
conf.GetStr("ztimer.db.database", "zhub"),
))
if err != nil {
log.Println(err)
return
}
db = _db
// 批量写入数据库等待超时5秒如有数据写入数据
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("MsgToDb Recovered:", r)
}
}()
var flagcount = 0
var _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n"
for {
select {
case msg := <-topicChan:
var topic, value = msg[1], msg[2]
var t = time.Now().UnixNano() / 1e6
_sql += fmt.Sprintf("('%s','%s','%s',%d),\n",
strconv.FormatInt(t, 36)+"-"+strconv.FormatInt(atomic.AddInt64(&seq, 1), 36), topic, value, t)
flagcount++
case <-time.After(time.Second * 5): // 等待5秒
if flagcount > 0 {
flagcount = 100
}
}
if flagcount != 100 {
continue
}
_sql = _sql[:len(_sql)-2]
_sql += ";"
_, err = db.Exec(_sql)
if err != nil {
log.Println(err)
}
_sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n"
flagcount = 0
}
}()
}

View File

@@ -28,7 +28,7 @@ var (
func init() { func init() {
// conn health check: T=10s, close>29s // conn health check: T=10s, close>29s
go func() { go func() {
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 20)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
@@ -60,17 +60,20 @@ func init() {
} }
} }
zsub.dataStorage()
} }
}() }()
} }
type ZSub struct { type ZSub struct {
sync.RWMutex sync.RWMutex
topics map[string]*ZTopic topics map[string]*ZTopic
timers map[string]*ZTimer timers map[string]*ZTimer
delays map[string]*ZDelay delays map[string]*ZDelay
locks map[string][]*Lock locks map[string][]*Lock
conns []*ZConn conns []*ZConn
delayup bool
} }
type ZConn struct { //ZConn type ZConn struct { //ZConn
@@ -242,8 +245,9 @@ func (c *ZConn) removeTo(arr []*ZConn) []*ZConn {
// ServerStart ================== ZHub server ===================================== // ServerStart ================== ZHub server =====================================
/* /*
1、初始化服务 ServerStart
2、启动服务监听 1、load history data
2、init server
*/ */
func ServerStart(addr string) { func ServerStart(addr string) {
conf.GetStr("data.dir", "data") conf.GetStr("data.dir", "data")
@@ -354,11 +358,11 @@ func (s *ZSub) acceptHandler(c *ZConn) {
} }
/* /*
accept stop message Publish topic message
1、send message to stop's chan 1、send message to topic's chan
2、feedback send success to sender, and sending message to stop's subscripts 2、feedback send success to sender, and sending message to topic's subscripts
*/ */
func (s *ZSub) publish(topic, msg string) { func (s *ZSub) Publish(topic, msg string) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic ztopic := s.topics[topic] //ZTopic

View File

@@ -207,7 +207,8 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
s.Lock() s.Lock()
defer func() { defer func() {
s.Unlock() s.Unlock()
s.dataStorage() // s.dataStorage()
s.delayup = true
}() }()
if len(rcmd) != 4 { if len(rcmd) != 4 {
c.send("-Error: subscribe para number!") c.send("-Error: subscribe para number!")
@@ -242,8 +243,10 @@ func (s *ZSub) delay(rcmd []string, c *ZConn) {
go func() { go func() {
select { select {
case <-delay.timer.C: case <-delay.timer.C:
zsub.publish(rcmd[1], rcmd[2]) zsub.Publish(rcmd[1], rcmd[2])
delete(s.delays, rcmd[1]+"-"+rcmd[2]) funChan <- func() {
delete(s.delays, rcmd[1]+"-"+rcmd[2])
}
} }
}() }()
} }