Files
zhub/zsub/zsub.go
lxy 40caa50fb7 修改:调整代码结构
git-svn-id: svn://47.119.165.148/zhub@110 e63fbceb-bcc3-4977-ac22-735b83d8d0f4
2021-02-24 10:20:43 +00:00

295 lines
5.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package zsub
import (
"bufio"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"zhub/conf"
)
var (
zsub = &ZSub{
topics: make(map[string]*ZTopic),
timers: make(map[string]*ZTimer),
delays: make(map[string]*ZDelay),
}
)
type ZSub struct {
sync.RWMutex
topics map[string]*ZTopic
timers map[string]*ZTimer
delays map[string]*ZDelay
}
type ZConn struct { //ZConn
sync.Mutex
conn *net.Conn
groupid string
topics []string
timers []string // 订阅、定时调度分别创建各自连接
stoped chan int // 关闭信号量
substoped map[string]chan int // 关闭信号量
}
func NewZConn(conn *net.Conn) *ZConn {
return &ZConn{
conn: conn,
topics: []string{},
timers: []string{},
stoped: make(chan int, 0),
substoped: make(map[string]chan int),
}
}
/*
新增订阅:
1、找到对应主题信息
2、加入到对应组别如果是第一次的消费组 offset从当前 mcount 开始
3、若有待消费消息启动消费
*/
func (c *ZConn) subscribe(topic string) { // 新增订阅 zconn{}
zsub.Lock()
defer zsub.Unlock()
ztopic := zsub.topics[topic] //ZTopic
if ztopic == nil {
ztopic = &ZTopic{
groups: map[string]*ZGroup{},
topic: topic,
chMsg: make(chan string, 10000),
}
ztopic.init()
zsub.topics[topic] = ztopic
}
zgroup := ztopic.groups[c.groupid] //ZGroup
if zgroup == nil {
zgroup = &ZGroup{
//conns: []*ZConn{},
ztopic: ztopic,
chMsg: make(chan string, 1000),
}
ztopic.groups[c.groupid] = zgroup
}
zgroup.appendTo(c)
for i, item := range c.topics {
if strings.EqualFold(item, topic) {
c.topics = append(c.topics[:i], c.topics[i+1:]...)
}
}
c.topics = append(c.topics, topic)
}
/*
取消订阅:
*/
func (c *ZConn) unsubscribe(topic string) { // 取消订阅 zconn{}
c.Lock()
defer c.Unlock()
close(c.substoped[topic])
ztopic := zsub.topics[topic] //ZTopic
if ztopic == nil {
return
}
zgroup := ztopic.groups[c.groupid] //ZGroup
if zgroup == nil {
return
}
for i, item := range zgroup.conns {
if item == c {
zgroup.conns = append(zgroup.conns[:i], zgroup.conns[i+1:]...)
}
}
}
// send message
func (c *ZConn) send(vs ...string) error {
c.Lock()
defer c.Unlock()
var bytes []byte
if len(vs) == 1 {
bytes = []byte(vs[0] + "\r\n")
} else if len(vs) > 1 {
data := "*" + strconv.Itoa(len(vs)) + "\r\n"
for _, v := range vs {
data += "$" + strconv.Itoa(len(v)) + "\r\n"
data += v + "\r\n"
}
bytes = []byte(data)
}
_, err := (*c.conn).Write(bytes)
return err
}
func (c *ZConn) close() {
close(c.stoped)
// sub
for _, topic := range c.topics {
c.unsubscribe(topic)
}
// timer conn close
zsub.Lock()
defer zsub.Unlock()
for _, topic := range c.timers { // fixme: 数据逻辑交叉循环
timer := zsub.timers[topic]
if timer == nil {
continue
}
for i, item := range timer.conns {
if item == c {
timer.conns = append(timer.conns[:i], timer.conns[i+1:]...)
}
}
}
(*c.conn).Close()
}
func (c *ZConn) appendTo(arr []*ZConn) []*ZConn {
if arr == nil {
arr = make([]*ZConn, 0)
}
for i, item := range arr {
if item == c {
arr = append(arr[:i], arr[i+1:]...)
}
}
return append(arr, c)
}
// ================== ZHub server =====================================
/*
1、初始化服务
2、启动服务监听
*/
func ServerStart(addr string) {
conf.GetStr("data.dir", "data")
go func() {
for {
fun, ok := <-funChan
if !ok {
break
}
fun()
}
}()
// 重新加载[定时、延时]
go zsub.reloadTimer()
go zsub.reloadDelay()
// 启动服务监听
listen, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}
log.Printf("zhub started listen on: %s \n", addr)
for {
conn, err := listen.Accept()
if err != nil {
log.Println(err)
continue
}
log.Println("conn start: ", conn.RemoteAddr())
zConn := NewZConn(&conn)
go zsub.acceptHandler(zConn)
}
}
// 连接处理
func (s *ZSub) acceptHandler(c *ZConn) {
defer func() {
if r := recover(); r != nil {
log.Println("acceptHandler Recovered:", r)
}
}()
defer func() {
c.close() // close ZConn
}()
reader := bufio.NewReader(*c.conn)
for {
rcmd := make([]string, 0)
line, _, err := reader.ReadLine()
if err != nil {
log.Println(err)
return
}
if len(line) == 0 {
continue
}
switch string(line[:1]) {
case "*":
n, _ := strconv.Atoi(string(line[1:]))
for i := 0; i < n; i++ {
reader.ReadLine()
v, _, _ := reader.ReadLine()
rcmd = append(rcmd, string(v))
}
default:
rcmd = append(rcmd, string(line))
}
if len(rcmd) == 0 {
continue
}
msgAccept(Message{Conn: c, Rcmd: rcmd})
}
}
/*
accept topic message
1、send message to topic's chan
2、feedback send success to sender, and sending message to topic's subscripts
*/
func (s *ZSub) publish(topic, msg string) {
s.RLock()
defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
return
}
ztopic.chMsg <- msg
ztopic.mcount++
}
/*
send broadcast message
*/
func (s *ZSub) broadcast(topic, msg string) {
s.RLock()
defer s.RUnlock()
ztopic := s.topics[topic] //ZTopic
if ztopic == nil {
return
}
for _, group := range ztopic.groups {
for _, conn := range group.conns {
conn.send("message", topic, msg)
}
}
}
func (s *ZSub) shutdown() {
s.saveDelay()
s.Lock()
os.Exit(0)
}