From 41263f0186bf39c71d51773a3607d05bd2579db1 Mon Sep 17 00:00:00 2001 From: hujiebin Date: Tue, 17 Oct 2023 13:59:06 +0800 Subject: [PATCH] Feature/level cast --- main.go | 93 ++++++++++++++++++++++++++++++++-- manager/userManager.go | 29 +++++++++++ protocol/userCenter.proto | 11 ++++ protocol/userProxy.proto | 103 ++++++++++++++++---------------------- 4 files changed, 172 insertions(+), 64 deletions(-) diff --git a/main.go b/main.go index 5e2a97a..4913aac 100644 --- a/main.go +++ b/main.go @@ -40,12 +40,14 @@ const ( kickChanSize = 500 broadcastChanSize = 3500 areacastChanSize = 3500 + levelcastChanSize = 3500 ) var ( kickChan chan KickChanMsg broadcastChan chan BroadcastChanMsg areacastChan chan AreaChanMsg + levelcastChan chan LevelChanMsg ) type KickChanMsg struct { @@ -65,6 +67,12 @@ type AreaChanMsg struct { in *userCenter.AreaMessage } +type LevelChanMsg struct { + ProxyAddr string + UserIds []uint64 + in *userCenter.LevelMessage +} + var ( userManager *manager.UserManager = nil termManager *manager.TerminalManager = nil @@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use return &userCenter.AreaMessageRsp{FailedUids: failed}, nil } +func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*userCenter.LevelMessageRsp, error) { + var failed []uint64 + terminals := termManager.GetAll() + if terminals != nil { + var uids []uint64 + for uidStr := range *terminals { + if uid, _ := strconv.ParseUint(uidStr, 10, 64); uid > 0 { + uids = append(uids, uid) + } + } + // 处理分区用户 + levelUserIds := userManager.GetLevelUsers(uids, in.Level) + if len(levelUserIds) <= 0 { + return &userCenter.LevelMessageRsp{FailedUids: failed}, nil + } + m := make(map[string][]uint64, 0) + for uid := range levelUserIds { + ok := false + addr := userManager.GetUser(uid) + if addr != nil { + if _, ok := m[*addr]; !ok { + m[*addr] = make([]uint64, 0) + } + m[*addr] = append(m[*addr], uid) + ok = true + } else { + mylogrus.MyLog.Errorf("Unknown user %d\n", uid) + } + if !ok { + failed = append(failed, uid) + } + } + for addr, users := range m { + const sendBatchSize = 5 + for i := 0; i < len(users); i += sendBatchSize { + end := i + sendBatchSize + if end > len(users) { + end = len(users) + } + levelcastChan <- LevelChanMsg{ + ProxyAddr: addr, + UserIds: users[i:end], + in: in, + } + } + } + } + return &userCenter.LevelMessageRsp{FailedUids: failed}, nil +} + func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) { mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad) @@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) { } } +func realLevelcast(addr string, uids []uint64, msg *userCenter.LevelMessage) { + for _, uid := range uids { + client := manager.UserProxyMgr.MakeClient(addr) + if client == nil { + mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr) + } else { + + toRouterClient := userCenter.NewRouterClient(client) + status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad) + if err != nil { + mylogrus.MyLog.Errorf("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err) + } + } + } +} + func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() @@ -613,6 +687,7 @@ func main() { kickChan = make(chan KickChanMsg, kickChanSize) broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize) areacastChan = make(chan AreaChanMsg, areacastChanSize) + levelcastChan = make(chan LevelChanMsg, levelcastChanSize) go check() // 检查长度 for i := 0; i < kickChanSize; i++ { go func(n int) { @@ -629,6 +704,11 @@ func main() { areacast(n) }(i) } + for i := 0; i < levelcastChanSize; i++ { + go func(n int) { + levelcast(n) + }(i) + } fmt.Println("Go RPC listening on ", port) lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) @@ -677,6 +757,12 @@ func areacast(n int) { } } +func levelcast(n int) { + for msg := range levelcastChan { + realLevelcast(msg.ProxyAddr, msg.UserIds, msg.in) + } +} + var lastDingTime time.Time var dingIntervalMin float64 = 5 // 5min 告警间隔 @@ -687,11 +773,12 @@ func check() { for { select { case <-tick.C: - l, l2, l3 := len(kickChan), len(broadcastChan), len(areacastChan) - if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength { + l, l2, l3, l4 := len(kickChan), len(broadcastChan), len(areacastChan), len(levelcastChan) + if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength || l4 >= monitorLength { if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin { go func() { - if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d", l, l2, l3), true); sErr != nil { + if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d", + l, l2, l3, l4), true); sErr != nil { mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) } else { lastDingTime = time.Now() diff --git a/manager/userManager.go b/manager/userManager.go index 616fb2a..85cd610 100644 --- a/manager/userManager.go +++ b/manager/userManager.go @@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT } return res } + +// 获取财富等级大于某等级的用户 +// 开区间 +func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea { + res := make(map[uint64]UserTinyArea) + // 从db中读,暂时不缓存(几千个) + var users []UserTinyArea + if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id"). + Select("user.id,external_id,sex,code,country,avatar"). + Where("user.id IN (?)", userIds). + Where("match_wealth_user_score.grade > ?", wealthLevel). + Find(&users).Error; err != nil { + mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err) + return res + } + for _, u := range users { + a := m.GetArea(u.Country) + res[u.ID] = UserTinyArea{ + ID: u.ID, + ExternalId: u.ExternalId, + Sex: u.Sex, + Code: u.Code, + Country: u.Country, + Area: a, + Avatar: u.Avatar, + } + } + return res +} diff --git a/protocol/userCenter.proto b/protocol/userCenter.proto index c20e730..207a5d2 100644 --- a/protocol/userCenter.proto +++ b/protocol/userCenter.proto @@ -132,6 +132,16 @@ message AreaMessageRsp { repeated uint64 failedUids = 1; } +message LevelMessage { + int32 level = 1; + uint32 msgType = 2; + bytes payLoad = 3; +} + +message LevelMessageRsp { + repeated uint64 failedUids = 1; +} + service Router { rpc route(RouteMessage) returns (RouteMessageRsp) {} rpc kickUser(KickMessage) returns (KickMessageRsp) {} @@ -143,6 +153,7 @@ service User { rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {} rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {} rpc areacast(AreaMessage) returns (AreaMessageRsp) {} + rpc levelcast(LevelMessage) returns (LevelMessageRsp) {} rpc transmit(BizMessage) returns (BizMessageRsp) {} rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {} rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {} diff --git a/protocol/userProxy.proto b/protocol/userProxy.proto index 906b1d6..ba94893 100644 --- a/protocol/userProxy.proto +++ b/protocol/userProxy.proto @@ -86,7 +86,6 @@ message MatchConfirm { uint32 remoteAgoraId = 6; uint32 callDuration = 7; uint32 localAgoraId = 8; - uint32 diamondBalance = 9; string matchUniqueId = 10; uint32 failType = 11; } @@ -97,62 +96,6 @@ message CallReady { uint64 endTimestamp = 2; uint64 callDuration = 3; string channelId = 4; - uint64 remainDiamond = 5; -} - -/* id == 103 礼物加时 */ -message AddTimeGift { - uint32 giftId = 1; - string token = 2; - uint32 duration = 3; - uint64 endTimestamp = 4; - string channelId = 5; - bool isSender = 6; - uint32 giftNum = 7; - string iconUrl = 8; - string svgaUrl = 9; - string senderAvatar = 10; - string receiverAvatar = 11; -} - -/* id == 104 免费加时 */ -message AddTimeFree { - string token = 1; - uint32 duration = 2; - uint64 endTimestamp = 3; - string channelId = 4; - uint32 senderAgoraId = 5; -} - -/* id == 105 退出 */ -message ConnectsQuit { - uint64 from_user_id = 1; -} - -/* id == 106 连接状态 */ -message ConnectStatus { - uint64 from_user_id = 1; - float user_diamonds = 2; - bool diamonds_enough = 3; -} - -/* id == 107 ??? */ -message ConnectsCall { - uint64 from_user_id = 1; - string rong_room_name = 2; - bool is_join = 3; -} - -/* id == 108 */ -message ConnectCommon { - string rong_room_name = 1; - uint64 from_user_id = 2; - string extra = 3; - string message = 4; -} - -/* id == 109 召回授权弹框 */ -message RecallWindow { } /* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/ @@ -165,17 +108,19 @@ message Video { string sendUserId = 6; string receiveUserId = 7; uint32 status = 8; - uint32 diamondBalance = 9; User sendUser = 10; } +/* id == 109 召回授权弹框 */ +message RecallWindow { +} + /* id == 111 视频通话准备 */ message VideoCallReady { uint64 startTimestamp = 1; uint64 endTimestamp = 2; uint64 callDuration = 3; string channelId = 4; - uint64 remainDiamond = 5; } /* id == 112 互相喜欢 */ @@ -213,6 +158,7 @@ message GlobalGiftBanner { uint32 bannerType = 14; // 类型:0.普通礼物 1.cp直接送礼 2.cp告白礼物 uint32 cpLevel = 15; // cp等级 string receiveUserAvatar = 16; + uint32 nobleLevel = 17; // 贵族等级 } /* id == 116 横幅的回应,用来测量RTT */ @@ -289,6 +235,7 @@ message GlobalBroadcast { string msg = 6; string groupId = 7; uint32 senderNobleLevel = 8; + bool isPinned = 9; } /* id == 124 全球消息 */ @@ -322,13 +269,11 @@ message VideoTimeMinuteSuccess { uint32 senderAgoraId = 5; string videoUniqueId = 6; bool isSend = 7; - uint32 sendRemainDiamond = 8; } /* id == 129 1对1视频1分钟加时询问检查 */ message VideoTimeMinuteCheck { string videoUniqueId = 1; - uint32 diamond = 2; string uuid = 3; } @@ -369,6 +314,7 @@ message GlobalGameBanner { uint64 diamond = 4; string bannerUrl = 5; uint64 gameId = 6; // 1.ludo 2.uno 3.dice 4.lucky wheel 5.lucky box 6.fruit 7.slot + bool isPink = 7; // 是否粉钻 } /* id == 147 羊羊匹配成功 */ @@ -377,6 +323,11 @@ message SheepMatchSuccess { User user = 2; User otherUser = 3; uint64 game_id = 4; + string channelId = 5; + string token = 6; + uint32 agoraId = 7; + uint32 provider = 8; + uint32 otherAgoraId = 9; } message SheepGamePlayer { @@ -463,4 +414,34 @@ message MicUserData { string micEffect = 14; string headwearIcon = 15; Svip svip = 16; +} + +/* id == 157 游戏大厅匹配成功 */ +message LobbyMatchSuccess { + uint64 game_id = 1; + uint64 mode = 2; + string group_id = 3; + User user = 4; + User otherUser = 5; + string gameCode = 6; +} + +/* id == 158 H5游戏静音 */ +message H5GameVoiceMute { +} + +/* id == 159 H5游戏打开语音 */ +message H5GameVoiceUnMute { +} + +/* id == 160 退出房间 */ +message QuitRoom { + uint32 reason = 1; // 原因1.被拉黑;2.被踢出 + string group_id = 2; +} + +/* id == 161 国家管理员横幅 */ +message GlobalCountryMgrBanner { + string country = 1; // 国家 + User user = 2; // 用户信息 } \ No newline at end of file -- 2.22.0