package main import ( "context" "encoding/json" "fmt" "github.com/go-redis/redis/v8" "hilo-micCenter/common/dingding" "hilo-micCenter/common/mylogrus" "hilo-micCenter/common/redisCli" "hilo-micCenter/common/tencentyun" "time" ) const SEND_WORKER = 2000 // 消费端协程数量 const MONITOR_LENGTH = 100 // 队列告警数量 var sendChan chan GroupSystemMsg func main() { //if !config.IsMaster() { // return //} mylogrus.MyLog.Infof("cron micChangeSys start") // 8核 n send + 4 blpop sendChan = make(chan GroupSystemMsg, SEND_WORKER) for i := 0; i < 4; i++ { go func() { deal() }() } for i := 0; i < SEND_WORKER; i++ { go func() { send() }() } go check() select {} } func check() { tick := time.NewTicker(time.Second * 3) defer tick.Stop() for { select { case <-tick.C: l, err := redisCli.GetRedisCluster().LLen(context.Background(), micInfoChange).Result() if err != nil { mylogrus.MyLog.Errorf("cron micChangeSys msg error,left %v-%v", l, err) } if l > MONITOR_LENGTH { go func() { if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("麦位变化通知延迟,队列%s长度:%d", micInfoChange, l), true); sErr != nil { mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) } }() } if l > 0 { mylogrus.MyLog.Infof("cron micChangeSys msg,left %v", l) } } } } func deal() { for true { //不需要加锁,注意,阻塞。 strs, err := redisCli.GetRedisCluster().BLPop(context.Background(), time.Second, micInfoChange).Result() if err != nil { if err != redis.Nil { mylogrus.MyLog.Errorf("cron micChangeSys redisCli.GetRedis().BLPop err:+%v", err) } } if len(strs) >= 2 { content := strs[1] //mylogrus.MyLog.Infof("cron micChangeSys content:%v", content) micSystemMsg := MicSystemMsg{} if err := json.Unmarshal([]byte(content), &micSystemMsg); err != nil { mylogrus.MyLog.Errorf("cron micChangeSys Unmarshal err:%+v, content:%v", err, content) } sendChan <- GroupSystemMsg{ MsgGroupUid: micSystemMsg.GroupUid, MsgId: micSystemMsg.MsgId, Source: micSystemMsg.Source, Target: micSystemMsg.Target, Content: micSystemMsg.Content, } } } } //var limiter = rate.NewLimiter(2000, 2000) func send() { for msg := range sendChan { //if err := limiter.Wait(context.Background()); err == nil { SendSignalMsg(msg.MsgGroupUid, msg) //} } } const micInfoChange = "mic_info_change" type MicSystemMsg struct { GroupUid string //房间ID Source string Target string MsgId uint8 Content string //要发送的内容 } type GroupSystemMsg struct { MsgGroupUid string `json:"-"` MsgId uint8 `json:"msgId"` Source string `json:"source"` Target string `json:"target"` Content string `json:"content"` } // SendSignalMsg 发送群信令。入参是内部imGroupId,这里做转换 func SendSignalMsg(groupId string, msg GroupSystemMsg) { buffer, err := json.Marshal(msg) if err == nil { str := string(buffer) //mylogrus.MyLog.Infof("cron micChangeSys SendSignalMsg: %s", str) logger := mylogrus.MyLog.WithField("msgId", msg.MsgId) if err = tencentyun.SendSystemMsg(logger, groupId, []string{}, str); err != nil { mylogrus.MyLog.Errorf("cron micChangeSys SendSignalMsg sync failed for %s, msgId = %d, content:%v, err:%+v", groupId, msg.MsgId, str, err) content := fmt.Sprintf("腾讯云推送失败,group:%v,msgId:%v,err:%v", groupId, msg.MsgId, err.Error()) if err := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, content, true); err != nil { mylogrus.MyLog.Errorf("cron micChangeSys SendSignalMsg send dingding fail%s", err.Error()) } } else { //mylogrus.MyLog.Infof("cron micChangeSys SendSignalMsg sync success for %s, msgId = %d, content:%v", groupId, msg.MsgId, str) } } else { mylogrus.MyLog.Errorf("cron micChangeSys SendSignalMsg failure, msgId = %d : %s , msg:%+v", msg.MsgId, err.Error(), msg) } }