diff --git a/zsub/msg-consumer.go b/zsub/msg-consumer.go index 1c69471..900bd94 100644 --- a/zsub/msg-consumer.go +++ b/zsub/msg-consumer.go @@ -80,9 +80,9 @@ func msgAccept(v Message) { if len(rcmd) != 3 { c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") } else { - if len(topicChan) < cap(topicChan) { + /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd - } + }*/ zsub.Publish(rcmd[1], rcmd[2]) } return @@ -90,9 +90,9 @@ func msgAccept(v Message) { if len(rcmd) != 3 { c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]") } else { - if len(topicChan) < cap(topicChan) { + /*if len(topicChan) < cap(topicChan) { topicChan <- rcmd - } + }*/ zsub.Publish(rcmd[1], rcmd[2]) } return diff --git a/zsub/zdb.go b/zsub/zdb.go index 6b9ac1c..3f97d7e 100644 --- a/zsub/zdb.go +++ b/zsub/zdb.go @@ -8,13 +8,13 @@ import ( "os" "strconv" "strings" - "sync/atomic" "time" ) +/* var ( - topicChan = make(chan []string, 1000) //接收到的 所有消息数据 -) + topicChan = make(chan []string, 1000) //接收到的 所有消息数据, 用于写入数据库持久化 +)*/ // Message 数据封装 type Message struct { @@ -172,57 +172,60 @@ var ( func init() { LoadConf("app.conf") - _db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", - GetStr("ztimer.db.user", "root"), - GetStr("ztimer.db.pwd", "123456"), - GetStr("ztimer.db.addr", "127.0.0.1:3306"), - GetStr("ztimer.db.database", "zhub"), - )) - if err != nil { - log.Println(err) - return - } + /* + _db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", + GetStr("ztimer.db.user", "root"), + GetStr("ztimer.db.pwd", "123456"), + GetStr("ztimer.db.addr", "127.0.0.1:3306"), + GetStr("ztimer.db.database", "zhub"), + )) + if err != nil { + log.Println(err) + return + } - db = _db + db = _db - // 批量写入数据库,等待超时5秒,如有数据写入数据 - go func() { - defer func() { - if r := recover(); r != nil { - log.Println("MsgToDb Recovered:", r) + // 批量写入数据库,等待超时5秒,如有数据写入数据 + + go func() { + defer func() { + if r := recover(); r != nil { + log.Println("MsgToDb Recovered:", r) + } + }() + + var flagcount = 0 + var _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" + for { + select { + case msg := <-topicChan: + var topic, value = msg[1], msg[2] + var t = time.Now().UnixNano() / 1e6 + _sql += fmt.Sprintf("('%s','%s','%s',%d),\n", + strconv.FormatInt(t, 36)+"-"+strconv.FormatInt(atomic.AddInt64(&seq, 1), 36), topic, value, t) + flagcount++ + case <-time.After(time.Second * 5): // 等待5秒 + if flagcount > 0 { + flagcount = 100 + } + } + + if flagcount != 100 { + continue + } + + _sql = _sql[:len(_sql)-2] + _sql += ";" + + _, err = db.Exec(_sql) + if err != nil { + log.Println(err) + } + + _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" + flagcount = 0 } }() - - var flagcount = 0 - var _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" - for { - select { - case msg := <-topicChan: - var topic, value = msg[1], msg[2] - var t = time.Now().UnixNano() / 1e6 - _sql += fmt.Sprintf("('%s','%s','%s',%d),\n", - strconv.FormatInt(t, 36)+"-"+strconv.FormatInt(atomic.AddInt64(&seq, 1), 36), topic, value, t) - flagcount++ - case <-time.After(time.Second * 5): // 等待5秒 - if flagcount > 0 { - flagcount = 100 - } - } - - if flagcount != 100 { - continue - } - - _sql = _sql[:len(_sql)-2] - _sql += ";" - - _, err = db.Exec(_sql) - if err != nil { - log.Println(err) - } - - _sql = "INSERT INTO zhub.topicmessage (`msgid`,`topic`,`value`,`createtime`) VALUES \n" - flagcount = 0 - } - }() + */ }