package main import ( "context" "encoding/json" "fmt" "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "hilo-socketCenter/common" "hilo-socketCenter/common/dingding" "hilo-socketCenter/common/mylogrus" "hilo-socketCenter/common/redisCli" "hilo-socketCenter/domain/model/rpc_m" "hilo-socketCenter/protocol/userCenter" "hilo-socketCenter/protocol/userProxy" "time" ) const SEND_WORKER = 1 // 消费端协程数量 const MONITOR_LENGTH = 1000 // 队列告警数量 const SocketQueueSendGift = "socket:queue:send_gift" var userClient userCenter.UserClient type SendGiftMsg struct { SendUserId uint64 `json:"sendUserId"` Msg *userProxy.GlobalGiftBanner `json:"msg"` } var sendGiftChan chan *SendGiftMsg var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams } // 初始化userCenterClient func init() { redisKey := fmt.Sprintf("service:userCenter") ipPorts, err := redisCli.GetRedisCluster().ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{ Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳 Max: "+inf", }).Result() if err != nil { failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err) mylogrus.MyLog.Errorf(failMsg) } else if len(ipPorts) > 0 { } if len(ipPorts) <= 0 { ipPorts = []string{"127.0.0.1:50040"} } addresses := make([]resolver.Address, len(ipPorts)) for i, s := range ipPorts { addresses[i].Addr = s mylogrus.MyLog.Infof("address : %s", s) } r := manual.NewBuilderWithScheme("hilo") r.InitialState(resolver.State{Addresses: addresses}) userCenterAddr := fmt.Sprintf("%s:///usercenter", r.Scheme()) // Set up addresses connection to the userCenter. conn, err := grpc.Dial(userCenterAddr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(kacp), grpc.WithResolvers(r), grpc.WithDefaultServiceConfig("{ \"loadBalancingPolicy\": \"round_robin\" }")) if err != nil { mylogrus.MyLog.Fatalf("did not connect: %v", err) } //defer conn.Close() userClient = userCenter.NewUserClient(conn) if userClient == nil { mylogrus.MyLog.Fatalln("userClient null") } } func main() { mylogrus.MyLog.Infof("cron sendGiftChan start") // 8核 n send + 4 blpop sendGiftChan = make(chan *SendGiftMsg, SEND_WORKER) for i := 0; i < 1; i++ { go func() { deal() }() } for i := 0; i < SEND_WORKER; i++ { go func() { send() }() } go check() select {} } func check() { tick := time.NewTicker(time.Second * 30) defer tick.Stop() for { select { case <-tick.C: l, err := redisCli.GetRedisCluster().LLen(context.Background(), SocketQueueSendGift).Result() if err != nil { mylogrus.MyLog.Infof("cron sendGiftChan msg error,left %v-%v", l, err) } if l > MONITOR_LENGTH { go func() { if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("送礼横幅变化通知延迟,队列%s长度:%d", SocketQueueSendGift, l), true); sErr != nil { mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) } }() n, err := redisCli.GetRedisCluster().Del(context.Background(), SocketQueueSendGift).Result() mylogrus.MyLog.Infof("del sendGiftChan msg queue:%v,n:%v,err:%v", SocketQueueSendGift, n, err) } if l > 0 { mylogrus.MyLog.Infof("cron sendGiftChan msg,left %v", l) } } } } func deal() { for true { //不需要加锁,注意,阻塞。 // 后进先出 strs, err := redisCli.GetRedisCluster().BRPop(context.Background(), time.Second, SocketQueueSendGift).Result() if err != nil { if err != redis.Nil { mylogrus.MyLog.Errorf("cron sendGiftChan redisCli.GetRedis().BLPop err:+%v", err) } } if len(strs) >= 2 { content := strs[1] //mylogrus.MyLog.Infof("cron sendGiftChan content:%v", content) msg := new(SendGiftMsg) if err := json.Unmarshal([]byte(content), msg); err != nil { mylogrus.MyLog.Errorf("cron sendGiftChan Unmarshal err:%+v, content:%v", err, content) } sendGiftChan <- msg } time.Sleep(time.Second * 5) // 控制全服banner速率 } } //var limiter = rate.NewLimiter(2000, 2000) func send() { for msg := range sendGiftChan { //if err := limiter.Wait(context.Background()); err == nil { SendToUserCenter(msg.SendUserId, msg.Msg) //} } } // SendToUserCenter 发送群信令。入参是内部imGroupId,这里做转换 func SendToUserCenter(sendUserId uint64, msg *userProxy.GlobalGiftBanner) { if buffer, err := proto.Marshal(msg); err == nil { rspUids, err := broadcast(common.MsgTypeGlobalGiftBanner, buffer) //记录socket,注意闭包问题 go func(userId uint64, msg *userProxy.GlobalGiftBanner, rspUids []uint64, err error) { buf, _ := json.Marshal(msg) rpc_m.AddRpcLog(common.MsgTypeGlobalGiftBanner, userId, string(buf[:]), rspUids, err) }(sendUserId, msg, rspUids, err) if err != nil { mylogrus.MyLog.Errorf("grpc GlobalGiftBanner send fail") } else { //mylogrus.MyLog.Info("grpc GlobalGiftBanner send success") } } else { } } //广播 func broadcast(msgType uint32, data []byte) ([]uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() rsp, err := userClient.Broadcast(ctx, &userCenter.BroadcastMessage{ MsgType: msgType, PayLoad: data, }) if err != nil { mylogrus.MyLog.Errorf("broadcast message failed %s", err.Error()) } if rsp != nil { //mylogrus.MyLog.Infof("broadcast message res:%v", rsp) return rsp.FailedUids, err } else { return []uint64{}, err } }