修改:代码包结构
This commit is contained in:
65
internal/monitor/monitor.go
Normal file
65
internal/monitor/monitor.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"zhub/internal/zsub"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// 1.日志文件 定期分割归档
|
||||
|
||||
}
|
||||
|
||||
func StartWatch() {
|
||||
dir, _ := os.Getwd()
|
||||
webDir := path.Join(dir, "/public")
|
||||
|
||||
http.Handle("/", http.FileServer(http.Dir(webDir)))
|
||||
http.HandleFunc("/info", info)
|
||||
http.HandleFunc("/cleanup", cleanup)
|
||||
http.HandleFunc("/retimer", retimer)
|
||||
http.HandleFunc("/topic/publish", publish)
|
||||
|
||||
watchAddr := zsub.Conf.Service.Watch
|
||||
log.Println("zhub.watch = ", watchAddr)
|
||||
http.ListenAndServe(watchAddr, nil)
|
||||
}
|
||||
|
||||
func publish(w http.ResponseWriter, r *http.Request) {
|
||||
topic := r.FormValue("topic")
|
||||
value := r.FormValue("value")
|
||||
zsub.Hub.Publish(topic, value)
|
||||
renderJson(w, "+ok")
|
||||
}
|
||||
|
||||
// retimer 重载定时调度
|
||||
func retimer(w http.ResponseWriter, r *http.Request) {
|
||||
zsub.Hub.ReloadTimer()
|
||||
renderJson(w, "+reload timer ok")
|
||||
}
|
||||
|
||||
func cleanup(w http.ResponseWriter, r *http.Request) {
|
||||
zsub.Hub.Clearup()
|
||||
renderJson(w, "+OK")
|
||||
}
|
||||
|
||||
func info(w http.ResponseWriter, r *http.Request) {
|
||||
info := zsub.Info()
|
||||
renderJson(w, info)
|
||||
}
|
||||
|
||||
func renderJson(w http.ResponseWriter, d interface{}) {
|
||||
var bytes []byte
|
||||
|
||||
if str, ok := d.(string); ok {
|
||||
bytes = []byte(str)
|
||||
} else {
|
||||
bytes, _ = json.Marshal(d)
|
||||
w.Header().Set("content-type", "application/json; charset=utf-8;")
|
||||
}
|
||||
w.Write(bytes)
|
||||
}
|
||||
197
internal/zsub/msg-accept.go
Normal file
197
internal/zsub/msg-accept.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var funChan = make(chan func(), 1000)
|
||||
|
||||
func handleMessage(v Message) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("handleMessage Recovered:", r)
|
||||
}
|
||||
}()
|
||||
c := v.Conn
|
||||
rcmd := v.Rcmd
|
||||
|
||||
if len(rcmd) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// ping reply
|
||||
if strings.EqualFold("+pong", v.Rcmd[0]) {
|
||||
v.Conn.pong = time.Now().Unix()
|
||||
return
|
||||
}
|
||||
|
||||
if Conf.Log.Level == "debug" && rcmd[0] != "auth" {
|
||||
log.Printf("[%d] cmd: %s\n", v.Conn.sn, strings.Join(rcmd, " "))
|
||||
} else if rcmd[0] == "auth" {
|
||||
if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 {
|
||||
c.send("-Error: invalid password!")
|
||||
return
|
||||
}
|
||||
|
||||
inx := strings.IndexAny(rcmd[1], "@") //user@pwd
|
||||
|
||||
authKey := rcmd[1][:inx] //user
|
||||
authValue := Conf.Auth[rcmd[1][:inx]] //pwd
|
||||
if strings.EqualFold(authValue, rcmd[1][inx+1:]) {
|
||||
c.auth = rcmd[1][:inx]
|
||||
c.send("+Auth: ok!")
|
||||
log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[OK]")
|
||||
} else {
|
||||
c.send("-Auth: invalid password!")
|
||||
log.Printf("[%d] cmd: %s\n", v.Conn.sn, "auth "+authKey+"@******* "+"[Error]")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if strings.TrimSpace(c.auth) == "" && rcmd[0] != "auth" && Conf.Service.Auth {
|
||||
c.send("-Auth: NOAUTH Authentication required:" + rcmd[0])
|
||||
return
|
||||
}
|
||||
|
||||
if len(rcmd) == 1 {
|
||||
switch strings.ToLower(rcmd[0]) {
|
||||
default:
|
||||
// str start with strs anyone
|
||||
var startWithAny = func(str string, strs ...string) bool {
|
||||
for _, str := range strs {
|
||||
if strings.Index(rcmd[0], str) == 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
arr := []string{"subscribe", "timer", "unsubscribe", "delay", "groupid"}
|
||||
if startWithAny(rcmd[0], arr...) {
|
||||
rcmd = strings.Split(rcmd[0], " ")
|
||||
} else {
|
||||
c.send("-Error: not supported:" + rcmd[0])
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cmd := rcmd[0]
|
||||
switch cmd {
|
||||
case "groupid":
|
||||
c.groupid = rcmd[1]
|
||||
return
|
||||
case "rpc":
|
||||
// if rpc and no sub back error
|
||||
if Hub.noSubscribe(rcmd[1]) {
|
||||
rpcBody := make(map[string]string)
|
||||
json.Unmarshal([]byte(rcmd[2]), &rpcBody)
|
||||
log.Println("rpc no subscribe: ", rcmd[1])
|
||||
|
||||
ruk := rpcBody["ruk"]
|
||||
Hub.Publish(strings.Split(ruk, "::")[0], "{'retcode': 404, 'retinfo': '服务离线!', 'ruk': '"+ruk+"'}")
|
||||
return
|
||||
}
|
||||
|
||||
if len(rcmd) != 3 {
|
||||
c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]")
|
||||
} else {
|
||||
/*if len(topicChan) < cap(topicChan) {
|
||||
topicChan <- rcmd
|
||||
}*/
|
||||
Hub.Publish(rcmd[1], rcmd[2])
|
||||
}
|
||||
return
|
||||
case "publish":
|
||||
if len(rcmd) != 3 {
|
||||
c.send("-Error: publish para number![" + strings.Join(rcmd, " ") + "]")
|
||||
} else {
|
||||
/*if len(topicChan) < cap(topicChan) {
|
||||
topicChan <- rcmd
|
||||
}*/
|
||||
Hub.Publish(rcmd[1], rcmd[2])
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// 内部执行指令 加入执行队列
|
||||
funChan <- func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("funChan Recovered:", r)
|
||||
}
|
||||
}()
|
||||
switch cmd {
|
||||
case "subscribe":
|
||||
// subscribe x y z
|
||||
for _, topic := range rcmd[1:] {
|
||||
c.subscribe(topic) // todo: 批量一次订阅
|
||||
}
|
||||
case "unsubscribe":
|
||||
for _, topic := range rcmd[1:] {
|
||||
c.unsubscribe(topic)
|
||||
}
|
||||
case "broadcast":
|
||||
Hub.broadcast(rcmd[1], rcmd[2])
|
||||
case "delay":
|
||||
Hub.delay(rcmd, c)
|
||||
case "timer":
|
||||
for _, name := range rcmd[1:] {
|
||||
Hub.timer([]string{"timer", name}, c) // append to timers
|
||||
c.timers = append(c.timers, name) // append to conns
|
||||
}
|
||||
case "cmd":
|
||||
if len(rcmd) == 1 {
|
||||
return
|
||||
}
|
||||
switch rcmd[1] {
|
||||
case "reload-timer":
|
||||
Hub.ReloadTimer()
|
||||
case "shutdown":
|
||||
if !strings.EqualFold(c.groupid, "group-admin") {
|
||||
return
|
||||
}
|
||||
Hub.shutdown()
|
||||
}
|
||||
case "lock":
|
||||
// lock key uuid 5
|
||||
if len(rcmd) != 4 {
|
||||
c.send("-Error: lock para number![" + strings.Join(rcmd, " ") + "]")
|
||||
return
|
||||
}
|
||||
d, _ := strconv.Atoi(rcmd[3])
|
||||
Hub._lock(&Lock{key: rcmd[1], uuid: rcmd[2], duration: d})
|
||||
case "unlock":
|
||||
// unlock key uuid
|
||||
if len(rcmd) != 3 {
|
||||
c.send("-Error: unlock para number![" + strings.Join(rcmd, " ") + "]")
|
||||
return
|
||||
}
|
||||
Hub._unlock(Lock{key: rcmd[1], uuid: rcmd[2]})
|
||||
/*case "auth":
|
||||
if len(rcmd) != 2 || strings.IndexAny(rcmd[1], "@") == -1 {
|
||||
c.send("-Error: invalid password!")
|
||||
return
|
||||
}
|
||||
|
||||
inx := strings.IndexAny(rcmd[1], "@") //user@pwd
|
||||
|
||||
authKey := Conf.Auth[rcmd[1][:inx]]
|
||||
if strings.EqualFold(authKey, rcmd[1][inx+1:]) {
|
||||
c.auth = rcmd[1][:inx]
|
||||
c.send("+Auth: ok!")
|
||||
} else {
|
||||
c.send("-Auth: invalid password!")
|
||||
}
|
||||
return*/
|
||||
default:
|
||||
c.send("-Error: default not supported:[" + strings.Join(rcmd, " ") + "]")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
170
internal/zsub/zdb.go
Normal file
170
internal/zsub/zdb.go
Normal file
@@ -0,0 +1,170 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
datadir string
|
||||
)
|
||||
|
||||
// Message 数据封装
|
||||
type Message struct {
|
||||
Conn *ZConn
|
||||
Rcmd []string
|
||||
}
|
||||
|
||||
// Append file append
|
||||
func Append(str string, fileName string) {
|
||||
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModeAppend)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = file.WriteString(str)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ZSub) SaveData() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("SaveData Recovered:", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
// ========================== delay save ===========================
|
||||
func() {
|
||||
if !s.delayup {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
s.delayup = false
|
||||
}()
|
||||
|
||||
err := os.Remove(datadir + "/delay.z")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
file, err := os.OpenFile(datadir+"/delay.z", os.O_CREATE|os.O_WRONLY, os.ModeAppend)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
writer := bufio.NewWriter(file)
|
||||
_delays := s.delays
|
||||
|
||||
for _, delay := range _delays {
|
||||
delayStr := fmt.Sprintf("%s %s %d\n", delay.Topic, delay.Value, delay.Exectime.Unix())
|
||||
writer.WriteString(delayStr)
|
||||
}
|
||||
writer.Flush()
|
||||
}()
|
||||
|
||||
// ========================== lock save ===========================
|
||||
func() {
|
||||
err := os.Remove(datadir + "/lock.z")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
str := ""
|
||||
for _, locks := range s.locks {
|
||||
for _, lock := range locks {
|
||||
str += fmt.Sprintf("%s %s %d %d\n", lock.key, lock.uuid, lock.duration, lock.start)
|
||||
break // 只记录获得锁的记录
|
||||
}
|
||||
}
|
||||
Append(str, datadir+"/lock.z")
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *ZSub) LoadData() {
|
||||
s.loadDelay()
|
||||
// s.loadLock()
|
||||
}
|
||||
|
||||
func (s *ZSub) loadDelay() {
|
||||
f, err := os.Open(datadir + "/delay.z")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := bufio.NewReader(f)
|
||||
for {
|
||||
bytes, err := r.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
line := string(bytes)
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
line = strings.Trim(line, " \r\n")
|
||||
split := strings.Split(line, " ")
|
||||
if len(split) != 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
exectime, err := strconv.ParseInt(split[2], 10, 64)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
if exectime < time.Now().Unix() {
|
||||
continue
|
||||
}
|
||||
s.delay([]string{"delay", split[0], split[1], strconv.FormatInt((exectime-time.Now().Unix())*1000, 10)}, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ZSub) loadLock() {
|
||||
f, err := os.Open(datadir + "/lock.z")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := bufio.NewReader(f)
|
||||
for {
|
||||
bytes, err := r.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
line := string(bytes)
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
line = strings.Trim(line, " \r\n")
|
||||
split := strings.Split(line, " ")
|
||||
if len(split) != 4 {
|
||||
continue
|
||||
}
|
||||
duration, err := strconv.Atoi(split[2])
|
||||
start, err := strconv.ParseInt(split[3], 10, 64)
|
||||
|
||||
if start > 0 && time.Now().Unix()-start > 1 {
|
||||
duration = int(time.Now().Unix() - start)
|
||||
} else {
|
||||
duration = 1
|
||||
}
|
||||
|
||||
s._lock(&Lock{
|
||||
key: split[0],
|
||||
uuid: split[1],
|
||||
duration: duration,
|
||||
// start: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
58
internal/zsub/zgroup.go
Normal file
58
internal/zsub/zgroup.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type ZGroup struct { // ZGroup
|
||||
sync.Mutex
|
||||
conns []*ZConn
|
||||
offset int32
|
||||
chMsg chan string // 组消息即时投递
|
||||
ztopic *ZTopic // 所属topic
|
||||
}
|
||||
|
||||
func (g *ZGroup) appendTo(c *ZConn) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
topic := g.ztopic.topic
|
||||
|
||||
// report subscribe topic check
|
||||
if c.substoped[topic] != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create new goroutine consumer message
|
||||
unsubChan := make(chan int, 0)
|
||||
c.substoped[topic] = unsubChan
|
||||
g.conns = c.appendTo(g.conns)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-g.chMsg:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := c.send("message", topic, msg)
|
||||
if err != nil { // 失败处理
|
||||
log.Println("topic send err:", err)
|
||||
g.chMsg <- msg
|
||||
return
|
||||
}
|
||||
|
||||
//log.Printf("[ %d ] topic send: %s %s\n", c.sn, topic, msg)
|
||||
atomic.AddInt32(&g.offset, 1)
|
||||
case <-c.stoped:
|
||||
return
|
||||
case <-unsubChan:
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
delete(c.substoped, topic)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
528
internal/zsub/zsub.go
Normal file
528
internal/zsub/zsub.go
Normal file
@@ -0,0 +1,528 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
"zhub/internal/config"
|
||||
)
|
||||
|
||||
var (
|
||||
Conf config.Config
|
||||
Hub = &ZSub{
|
||||
topics: make(map[string]*ZTopic),
|
||||
timers: make(map[string]*ZTimer),
|
||||
delays: make(map[string]*ZDelay),
|
||||
locks: make(map[string][]*Lock),
|
||||
conns: make([]*ZConn, 0),
|
||||
}
|
||||
SN int32 = 1000
|
||||
)
|
||||
|
||||
func init() {
|
||||
// conn health check: T=10s, close>29s
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * 20)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
funChan <- func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("conn health check Recovered:", r)
|
||||
}
|
||||
}()
|
||||
conns := make([]*ZConn, 0) // 需要关闭的连接
|
||||
for _, c := range Hub.conns {
|
||||
if c.ping > 0 && c.ping-c.pong > 19 {
|
||||
conns = c.appendTo(conns)
|
||||
continue
|
||||
}
|
||||
|
||||
c.ping = time.Now().Unix()
|
||||
if c.pong == 0 {
|
||||
c.pong = c.ping
|
||||
}
|
||||
|
||||
c.send("+ping")
|
||||
}
|
||||
|
||||
// close
|
||||
for _, c := range conns {
|
||||
log.Printf("========================================= conn ping close:%s [%d] =========================================\n", (*c.conn).RemoteAddr(), c.sn)
|
||||
c.close()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Hub.SaveData()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
type ZSub struct {
|
||||
sync.RWMutex
|
||||
topics map[string]*ZTopic
|
||||
timers map[string]*ZTimer
|
||||
delays map[string]*ZDelay
|
||||
locks map[string][]*Lock
|
||||
conns []*ZConn
|
||||
delayup bool
|
||||
}
|
||||
|
||||
type ZConn struct { //ZConn
|
||||
sync.Mutex
|
||||
sn int32 // 连接编号
|
||||
conn *net.Conn
|
||||
groupid string
|
||||
topics []string
|
||||
timers []string // 订阅、定时调度分别创建各自连接
|
||||
stoped chan int // 关闭信号量
|
||||
substoped map[string]chan int // 关闭信号量
|
||||
ping int64 // 最后心跳时间
|
||||
pong int64 // 最后心跳回复时间
|
||||
auth string // 是否已验证授权
|
||||
}
|
||||
|
||||
type Lock struct {
|
||||
key string
|
||||
uuid string
|
||||
duration int
|
||||
timer *time.Timer
|
||||
start int64
|
||||
//stop time.Time
|
||||
}
|
||||
|
||||
func NewZConn(conn *net.Conn) *ZConn {
|
||||
return &ZConn{
|
||||
sn: atomic.AddInt32(&SN, 1), // 连接编号
|
||||
conn: conn,
|
||||
topics: []string{},
|
||||
timers: []string{},
|
||||
stoped: make(chan int, 0),
|
||||
substoped: make(map[string]chan int),
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
新增订阅:
|
||||
1、找到对应主题信息
|
||||
2、加入到对应组别;如果是第一次的消费组 offset从当前 mcount 开始
|
||||
3、若有待消费消息启动消费
|
||||
*/
|
||||
func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
|
||||
Hub.Lock()
|
||||
defer Hub.Unlock()
|
||||
ztopic := Hub.topics[topic] //ZTopic
|
||||
if ztopic == nil {
|
||||
ztopic = &ZTopic{
|
||||
groups: map[string]*ZGroup{},
|
||||
topic: topic,
|
||||
chMsg: make(chan string, 500),
|
||||
}
|
||||
ztopic.init()
|
||||
Hub.topics[topic] = ztopic
|
||||
}
|
||||
|
||||
zgroup := ztopic.groups[c.groupid] //ZGroup
|
||||
if zgroup == nil {
|
||||
zgroup = &ZGroup{
|
||||
//conns: []*ZConn{},
|
||||
ztopic: ztopic,
|
||||
chMsg: make(chan string, 500),
|
||||
}
|
||||
ztopic.groups[c.groupid] = zgroup
|
||||
}
|
||||
|
||||
zgroup.appendTo(c)
|
||||
|
||||
for i, item := range c.topics {
|
||||
if strings.EqualFold(item, topic) {
|
||||
c.topics = append(c.topics[:i], c.topics[i+1:]...)
|
||||
}
|
||||
}
|
||||
c.topics = append(c.topics, topic)
|
||||
}
|
||||
|
||||
/*
|
||||
取消订阅:
|
||||
*/
|
||||
func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{}
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
close(c.substoped[topic])
|
||||
ztopic := Hub.topics[topic] //ZTopic
|
||||
if ztopic == nil {
|
||||
return
|
||||
}
|
||||
|
||||
zgroup := ztopic.groups[c.groupid] //ZGroup
|
||||
if zgroup == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, item := range zgroup.conns {
|
||||
if item == c {
|
||||
zgroup.conns = append(zgroup.conns[:i], zgroup.conns[i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// send message
|
||||
func (c *ZConn) send(vs ...string) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
var bytes []byte
|
||||
|
||||
if len(vs) == 1 {
|
||||
bytes = []byte(vs[0] + "\r\n")
|
||||
} else if len(vs) > 1 {
|
||||
data := "*" + strconv.Itoa(len(vs)) + "\r\n"
|
||||
for _, v := range vs {
|
||||
data += "$" + strconv.Itoa(utf8.RuneCountInString(v)) + "\r\n"
|
||||
data += v + "\r\n"
|
||||
}
|
||||
bytes = []byte(data)
|
||||
}
|
||||
_, err := (*c.conn).Write(bytes)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ZConn) close() {
|
||||
close(c.stoped)
|
||||
// sub
|
||||
for _, topic := range c.topics {
|
||||
c.unsubscribe(topic)
|
||||
}
|
||||
|
||||
// timer conn close
|
||||
Hub.Lock()
|
||||
defer Hub.Unlock()
|
||||
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
|
||||
timer := Hub.timers[topic]
|
||||
if timer == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for i, item := range timer.Conns {
|
||||
if item == c {
|
||||
timer.Conns = append(timer.Conns[:i], timer.Conns[i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
(*c.conn).Close()
|
||||
}
|
||||
|
||||
func (c *ZConn) appendTo(arr []*ZConn) []*ZConn {
|
||||
if arr == nil {
|
||||
arr = make([]*ZConn, 0)
|
||||
}
|
||||
for i, item := range arr {
|
||||
if item == c {
|
||||
arr = append(arr[:i], arr[i+1:]...)
|
||||
}
|
||||
}
|
||||
return append(arr, c)
|
||||
}
|
||||
|
||||
func (c *ZConn) removeTo(arr []*ZConn) []*ZConn {
|
||||
if arr == nil {
|
||||
arr = make([]*ZConn, 0)
|
||||
}
|
||||
for i, item := range arr {
|
||||
if item == c {
|
||||
arr = append(arr[:i], arr[i+1:]...)
|
||||
}
|
||||
}
|
||||
return arr
|
||||
}
|
||||
|
||||
// StartServer ================== ZHub server =====================================
|
||||
/*
|
||||
StartServer
|
||||
1、load history data
|
||||
2、init server
|
||||
*/
|
||||
func StartServer(addr string, conf config.Config) {
|
||||
Conf = conf
|
||||
datadir = conf.Data.Dir
|
||||
|
||||
go func() {
|
||||
for {
|
||||
fun, ok := <-funChan
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
fun()
|
||||
}
|
||||
}()
|
||||
|
||||
// 重新加载[定时、延时]
|
||||
go Hub.ReloadTimer()
|
||||
go Hub.LoadData()
|
||||
|
||||
// 启动服务监听
|
||||
listen, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println("zhub.server =", addr)
|
||||
|
||||
for {
|
||||
conn, err := listen.Accept()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
zConn := NewZConn(&conn)
|
||||
|
||||
log.Printf("conn start: %s [%d]\n", conn.RemoteAddr(), zConn.sn)
|
||||
go Hub.handlerConn(zConn)
|
||||
}
|
||||
}
|
||||
|
||||
// 连接处理
|
||||
func (s *ZSub) handlerConn(c *ZConn) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("handlerConn Recovered:", r)
|
||||
}
|
||||
log.Println("conn closed:", (*c.conn).RemoteAddr(), "[", c.sn, "]")
|
||||
}()
|
||||
defer func() {
|
||||
// conn remove to conns
|
||||
funChan <- func() {
|
||||
Hub.conns = c.removeTo(Hub.conns)
|
||||
}
|
||||
|
||||
// close ZConn
|
||||
c.close()
|
||||
}()
|
||||
|
||||
// conn add to conns
|
||||
funChan <- func() {
|
||||
Hub.conns = c.appendTo(Hub.conns)
|
||||
}
|
||||
|
||||
reader := bufio.NewReader(*c.conn)
|
||||
for {
|
||||
rcmd := make([]string, 0)
|
||||
line, _, err := reader.ReadLine()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
switch string(line[:1]) {
|
||||
case "*":
|
||||
n, _ := strconv.Atoi(string(line[1:]))
|
||||
for i := 0; i < n; i++ {
|
||||
line, _, _ := reader.ReadLine()
|
||||
clen := 0
|
||||
if strings.EqualFold("$", string(line[:1])) {
|
||||
clen, _ = strconv.Atoi(string(line[1:]))
|
||||
}
|
||||
var vx = ""
|
||||
a:
|
||||
if v, prefix, _ := reader.ReadLine(); prefix {
|
||||
vx += string(v)
|
||||
goto a
|
||||
} else {
|
||||
vx += string(v)
|
||||
}
|
||||
if clen > utf8.RuneCountInString(vx) {
|
||||
vx += "\r\n"
|
||||
goto a
|
||||
}
|
||||
|
||||
rcmd = append(rcmd, vx)
|
||||
}
|
||||
default:
|
||||
rcmd = append(rcmd, string(line))
|
||||
}
|
||||
|
||||
if len(rcmd) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
handleMessage(Message{Conn: c, Rcmd: rcmd})
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Publish topic message
|
||||
1、send message to topic's chan
|
||||
2、feedback send success to sender, and sending message to topic's subscripts
|
||||
*/
|
||||
func (s *ZSub) Publish(topic, msg string) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
ztopic := s.topics[topic] //ZTopic
|
||||
if ztopic == nil {
|
||||
return
|
||||
}
|
||||
ztopic.mcount++
|
||||
|
||||
// topic chan overload check
|
||||
if len(ztopic.chMsg) == cap(ztopic.chMsg) {
|
||||
log.Println(fmt.Sprintf("ztopic no cap: [%s %s]", topic, msg))
|
||||
return
|
||||
}
|
||||
|
||||
ztopic.chMsg <- msg
|
||||
}
|
||||
|
||||
/*
|
||||
send broadcast message
|
||||
*/
|
||||
func (s *ZSub) broadcast(topic, msg string) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
if strings.EqualFold(topic, "lock") {
|
||||
log.Println("lock", msg)
|
||||
}
|
||||
|
||||
ztopic := s.topics[topic] //ZTopic
|
||||
if ztopic == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, group := range ztopic.groups {
|
||||
for _, conn := range group.conns {
|
||||
conn.send("message", topic, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
lock: lock key uuid t
|
||||
unlock: unlock key uuid
|
||||
*/
|
||||
func (s *ZSub) _lock(lock *Lock) {
|
||||
locks := s.locks[lock.key]
|
||||
if locks == nil {
|
||||
locks = make([]*Lock, 0)
|
||||
}
|
||||
if len(locks) == 0 { // lock success
|
||||
lock.start = time.Now().Unix()
|
||||
locks = append(locks, lock)
|
||||
s.locks[lock.key] = locks
|
||||
s.broadcast("lock", lock.uuid)
|
||||
|
||||
// 设置时间到解锁
|
||||
locks[0].timer = time.NewTimer(time.Duration(locks[0].duration) * time.Second)
|
||||
go func() {
|
||||
select {
|
||||
case <-locks[0].timer.C:
|
||||
s._unlock(*locks[0])
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
s.locks[lock.key] = append(locks, lock)
|
||||
}
|
||||
}
|
||||
func (s *ZSub) _unlock(l Lock) {
|
||||
locks := s.locks[l.key]
|
||||
if locks == nil || len(locks) == 0 {
|
||||
return
|
||||
}
|
||||
if strings.EqualFold(locks[0].uuid, l.uuid) {
|
||||
locks[0].timer.Stop()
|
||||
locks = locks[1:]
|
||||
s.locks[l.key] = locks
|
||||
}
|
||||
if len(s.locks[l.key]) > 0 { // next lock
|
||||
s.broadcast("lock", s.locks[l.key][0].uuid)
|
||||
s.locks[l.key][0].start = time.Now().Unix()
|
||||
s.locks[l.key][0].timer = time.NewTimer(time.Duration(s.locks[l.key][0].duration) * time.Second)
|
||||
go func() {
|
||||
select {
|
||||
case <-s.locks[l.key][0].timer.C:
|
||||
s._unlock(*s.locks[l.key][0])
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ZSub) shutdown() {
|
||||
s.SaveData()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func Info() map[string]interface{} {
|
||||
// topics
|
||||
topics := map[string]interface{}{}
|
||||
for s, topic := range Hub.topics {
|
||||
// {groups:[{name:xxx,size:xx}]}
|
||||
arr := make([]map[string]interface{}, 0)
|
||||
|
||||
for groupname, group := range topic.groups {
|
||||
arr = append(arr, map[string]interface{}{
|
||||
"name": groupname,
|
||||
"subsize": len(group.conns),
|
||||
"offset": group.offset,
|
||||
"mcount": topic.mcount,
|
||||
})
|
||||
}
|
||||
topics[s] = arr
|
||||
}
|
||||
|
||||
// conns
|
||||
conns := make([]interface{}, 0)
|
||||
for _, c := range Hub.conns {
|
||||
m := make(map[string]interface{}, 0)
|
||||
m["remoteaddr"] = (*c.conn).RemoteAddr()
|
||||
m["groupid"] = c.groupid
|
||||
m["topics"] = c.topics
|
||||
m["timers"] = c.timers
|
||||
m["auth"] = c.auth
|
||||
conns = append(conns, m)
|
||||
}
|
||||
|
||||
info := map[string]interface{}{
|
||||
"topics": topics,
|
||||
"topicsize": len(topics),
|
||||
"timersize": len(Hub.timers),
|
||||
"conns": conns,
|
||||
"connsize": len(Hub.conns),
|
||||
}
|
||||
return info
|
||||
}
|
||||
|
||||
func (s *ZSub) Clearup() {
|
||||
for tn, topic := range s.topics {
|
||||
for _, group := range topic.groups {
|
||||
if len(group.conns) > 0 || topic.mcount > group.offset {
|
||||
goto a
|
||||
}
|
||||
}
|
||||
close(topic.chMsg)
|
||||
delete(s.topics, tn)
|
||||
a:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ZSub) noSubscribe(topic string) bool {
|
||||
zTopic := s.topics[topic]
|
||||
if zTopic == nil || len(zTopic.groups) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, g := range zTopic.groups {
|
||||
if len(g.conns) > 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
184
internal/zsub/ztimer.go
Normal file
184
internal/zsub/ztimer.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/robfig/cron"
|
||||
"log"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ZTimer struct {
|
||||
Conns []*ZConn
|
||||
Expr string
|
||||
Topic string
|
||||
Cron *cron.Cron
|
||||
Ticker *time.Ticker
|
||||
Single bool
|
||||
}
|
||||
|
||||
type ZDelay struct {
|
||||
Topic string
|
||||
Value string
|
||||
Exectime time.Time
|
||||
Timer *time.Timer
|
||||
}
|
||||
|
||||
// delay topic value 100 -> publish topic value
|
||||
func (s *ZSub) delay(rcmd []string, c *ZConn) {
|
||||
s.Lock()
|
||||
defer func() {
|
||||
s.Unlock()
|
||||
// s.SaveData()
|
||||
s.delayup = true
|
||||
}()
|
||||
if len(rcmd) != 4 {
|
||||
c.send("-Error: subscribe para number!")
|
||||
return
|
||||
}
|
||||
|
||||
t, err := strconv.ParseInt(rcmd[3], 10, 64)
|
||||
if err != nil {
|
||||
c.send("-Error: " + strings.Join(rcmd, " "))
|
||||
return
|
||||
}
|
||||
|
||||
delay := s.delays[rcmd[1]+"-"+rcmd[2]]
|
||||
if delay != nil {
|
||||
if t < 0 {
|
||||
delay.Timer.Stop()
|
||||
delete(s.delays, rcmd[1]+"-"+rcmd[2])
|
||||
return
|
||||
}
|
||||
delay.Timer.Reset(time.Duration(t) * time.Millisecond)
|
||||
} else {
|
||||
if t < 0 {
|
||||
return
|
||||
}
|
||||
delay := &ZDelay{
|
||||
Topic: rcmd[1],
|
||||
Value: rcmd[2],
|
||||
Exectime: time.Now().Add(time.Duration(t) * time.Millisecond),
|
||||
Timer: time.NewTimer(time.Duration(t) * time.Millisecond),
|
||||
}
|
||||
s.delays[rcmd[1]+"-"+rcmd[2]] = delay
|
||||
go func() {
|
||||
select {
|
||||
case <-delay.Timer.C:
|
||||
log.Println("delay send:", rcmd[1], rcmd[2])
|
||||
Hub.Publish(rcmd[1], rcmd[2])
|
||||
funChan <- func() {
|
||||
delete(s.delays, rcmd[1]+"-"+rcmd[2])
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
["Timer", Topic, expr, 0|1]
|
||||
*/
|
||||
func (s *ZSub) timer(rcmd []string, c *ZConn) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
timer := s.timers[rcmd[1]]
|
||||
if timer == nil {
|
||||
timer = &ZTimer{
|
||||
Conns: []*ZConn{},
|
||||
Topic: rcmd[1],
|
||||
}
|
||||
s.timers[rcmd[1]] = timer
|
||||
}
|
||||
if c != nil {
|
||||
timer.Conns = c.appendTo(timer.Conns)
|
||||
}
|
||||
|
||||
if len(rcmd) == 4 && !strings.EqualFold(timer.Expr, rcmd[2]) {
|
||||
timer.Expr = rcmd[2]
|
||||
if timer.Cron != nil {
|
||||
timer.Cron.Stop()
|
||||
}
|
||||
if timer.Ticker != nil {
|
||||
timer.Ticker.Stop()
|
||||
}
|
||||
|
||||
var timerFun = func() {
|
||||
for _, conn := range timer.Conns {
|
||||
log.Println("Timer send:", timer.Topic)
|
||||
err := conn.send("Timer", timer.Topic)
|
||||
if timer.Single && err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r, _ := regexp.Compile("^\\d+[d,H,m,s]$")
|
||||
expr := timer.Expr
|
||||
if r.MatchString(expr) {
|
||||
n, _ := strconv.Atoi(expr[:len(expr)-1])
|
||||
_n := time.Duration(n)
|
||||
var ticker *time.Ticker
|
||||
switch expr[len(expr)-1:] {
|
||||
case "d":
|
||||
ticker = time.NewTicker(_n * time.Hour * 24)
|
||||
case "H":
|
||||
ticker = time.NewTicker(_n * time.Hour)
|
||||
case "m":
|
||||
ticker = time.NewTicker(_n * time.Minute)
|
||||
case "s":
|
||||
ticker = time.NewTicker(_n * time.Second)
|
||||
}
|
||||
|
||||
timer.Ticker = ticker
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
timerFun()
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
timer.Cron = func() *cron.Cron {
|
||||
c := cron.New()
|
||||
c.AddFunc(timer.Expr, timerFun)
|
||||
go c.Run()
|
||||
return c
|
||||
}()
|
||||
}
|
||||
//Timer.configSave()
|
||||
}
|
||||
if len(rcmd) == 4 && (strings.EqualFold("a", rcmd[3]) != timer.Single) {
|
||||
timer.Single = strings.EqualFold("a", rcmd[3])
|
||||
//Timer.configSave()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ZSub) ReloadTimer() {
|
||||
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8",
|
||||
Conf.Ztimer.Db.User,
|
||||
Conf.Ztimer.Db.Password,
|
||||
Conf.Ztimer.Db.Addr,
|
||||
Conf.Ztimer.Db.Database,
|
||||
))
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
rows, err := db.Query("SELECT t.`name`, IF(t.`status`=10,t.`expr`,''), IF(t.`single`=1,'a','x') 'single' FROM tasktimer t ORDER BY t.`timerid`")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var expr string
|
||||
var single string
|
||||
rows.Scan(&name, &expr, &single)
|
||||
s.timer([]string{"Timer", name, expr, single}, nil) //["Timer", Topic, expr, a|x]
|
||||
}
|
||||
}
|
||||
36
internal/zsub/ztopic.go
Normal file
36
internal/zsub/ztopic.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package zsub
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type ZTopic struct { //ZTopic
|
||||
sync.Mutex
|
||||
groups map[string]*ZGroup
|
||||
mcount int32
|
||||
topic string // 主题名称
|
||||
chMsg chan string // 主题消息投递
|
||||
}
|
||||
|
||||
// 主题消息发送
|
||||
func (t *ZTopic) init() {
|
||||
go func() {
|
||||
for {
|
||||
msg, ok := <-t.chMsg
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
for groupName, group := range t.groups {
|
||||
// zgroup chan overload check
|
||||
if len(group.chMsg) == cap(group.chMsg) {
|
||||
log.Println(fmt.Sprintf("zgroup no cap: [%s.%s %s]", groupName, t.topic, msg))
|
||||
continue
|
||||
}
|
||||
group.chMsg <- msg
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user