...
 
Commits (3)
......@@ -40,12 +40,14 @@ const (
kickChanSize = 500
broadcastChanSize = 3500
areacastChanSize = 3500
levelcastChanSize = 3500
)
var (
kickChan chan KickChanMsg
broadcastChan chan BroadcastChanMsg
areacastChan chan AreaChanMsg
levelcastChan chan LevelChanMsg
)
type KickChanMsg struct {
......@@ -65,6 +67,12 @@ type AreaChanMsg struct {
in *userCenter.AreaMessage
}
type LevelChanMsg struct {
ProxyAddr string
UserIds []uint64
in *userCenter.LevelMessage
}
var (
userManager *manager.UserManager = nil
termManager *manager.TerminalManager = nil
......@@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use
return &userCenter.AreaMessageRsp{FailedUids: failed}, nil
}
func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*userCenter.LevelMessageRsp, error) {
var failed []uint64
terminals := termManager.GetAll()
if terminals != nil {
var uids []uint64
for uidStr := range *terminals {
if uid, _ := strconv.ParseUint(uidStr, 10, 64); uid > 0 {
uids = append(uids, uid)
}
}
// 处理分区用户
levelUserIds := userManager.GetLevelUsers(uids, in.Level)
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
m := make(map[string][]uint64, 0)
for uid := range levelUserIds {
ok := false
addr := userManager.GetUser(uid)
if addr != nil {
if _, ok := m[*addr]; !ok {
m[*addr] = make([]uint64, 0)
}
m[*addr] = append(m[*addr], uid)
ok = true
} else {
mylogrus.MyLog.Errorf("Unknown user %d\n", uid)
}
if !ok {
failed = append(failed, uid)
}
}
for addr, users := range m {
const sendBatchSize = 5
for i := 0; i < len(users); i += sendBatchSize {
end := i + sendBatchSize
if end > len(users) {
end = len(users)
}
levelcastChan <- LevelChanMsg{
ProxyAddr: addr,
UserIds: users[i:end],
in: in,
}
}
}
}
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) {
mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad)
......@@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) {
}
}
func realLevelcast(addr string, uids []uint64, msg *userCenter.LevelMessage) {
for _, uid := range uids {
client := manager.UserProxyMgr.MakeClient(addr)
if client == nil {
mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr)
} else {
toRouterClient := userCenter.NewRouterClient(client)
status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad)
if err != nil {
mylogrus.MyLog.Errorf("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err)
}
}
}
}
func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
......@@ -613,6 +687,7 @@ func main() {
kickChan = make(chan KickChanMsg, kickChanSize)
broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize)
areacastChan = make(chan AreaChanMsg, areacastChanSize)
levelcastChan = make(chan LevelChanMsg, levelcastChanSize)
go check() // 检查长度
for i := 0; i < kickChanSize; i++ {
go func(n int) {
......@@ -629,6 +704,11 @@ func main() {
areacast(n)
}(i)
}
for i := 0; i < levelcastChanSize; i++ {
go func(n int) {
levelcast(n)
}(i)
}
fmt.Println("Go RPC listening on ", port)
lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port))
......@@ -677,6 +757,12 @@ func areacast(n int) {
}
}
func levelcast(n int) {
for msg := range levelcastChan {
realLevelcast(msg.ProxyAddr, msg.UserIds, msg.in)
}
}
var lastDingTime time.Time
var dingIntervalMin float64 = 5 // 5min 告警间隔
......@@ -687,11 +773,12 @@ func check() {
for {
select {
case <-tick.C:
l, l2, l3 := len(kickChan), len(broadcastChan), len(areacastChan)
if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength {
l, l2, l3, l4 := len(kickChan), len(broadcastChan), len(areacastChan), len(levelcastChan)
if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength || l4 >= monitorLength {
if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin {
go func() {
if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d", l, l2, l3), true); sErr != nil {
if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d",
l, l2, l3, l4), true); sErr != nil {
mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr)
} else {
lastDingTime = time.Now()
......
......@@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
}
return res
}
// 获取财富等级大于某等级的用户
// 开区间
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea {
res := make(map[uint64]UserTinyArea)
// 从db中读,暂时不缓存(几千个)
var users []UserTinyArea
if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id").
Select("user.id,external_id,sex,code,country,avatar").
Where("user.id IN (?)", userIds).
Where("match_wealth_user_score.grade > ?", wealthLevel).
Find(&users).Error; err != nil {
mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err)
return res
}
for _, u := range users {
a := m.GetArea(u.Country)
res[u.ID] = UserTinyArea{
ID: u.ID,
ExternalId: u.ExternalId,
Sex: u.Sex,
Code: u.Code,
Country: u.Country,
Area: a,
Avatar: u.Avatar,
}
}
return res
}
......@@ -132,6 +132,16 @@ message AreaMessageRsp {
repeated uint64 failedUids = 1;
}
message LevelMessage {
int32 level = 1;
uint32 msgType = 2;
bytes payLoad = 3;
}
message LevelMessageRsp {
repeated uint64 failedUids = 1;
}
service Router {
rpc route(RouteMessage) returns (RouteMessageRsp) {}
rpc kickUser(KickMessage) returns (KickMessageRsp) {}
......@@ -143,6 +153,7 @@ service User {
rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {}
rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {}
rpc areacast(AreaMessage) returns (AreaMessageRsp) {}
rpc levelcast(LevelMessage) returns (LevelMessageRsp) {}
rpc transmit(BizMessage) returns (BizMessageRsp) {}
rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {}
rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {}
......
......@@ -86,7 +86,6 @@ message MatchConfirm {
uint32 remoteAgoraId = 6;
uint32 callDuration = 7;
uint32 localAgoraId = 8;
uint32 diamondBalance = 9;
string matchUniqueId = 10;
uint32 failType = 11;
}
......@@ -97,62 +96,6 @@ message CallReady {
uint64 endTimestamp = 2;
uint64 callDuration = 3;
string channelId = 4;
uint64 remainDiamond = 5;
}
/* id == 103 礼物加时 */
message AddTimeGift {
uint32 giftId = 1;
string token = 2;
uint32 duration = 3;
uint64 endTimestamp = 4;
string channelId = 5;
bool isSender = 6;
uint32 giftNum = 7;
string iconUrl = 8;
string svgaUrl = 9;
string senderAvatar = 10;
string receiverAvatar = 11;
}
/* id == 104 免费加时 */
message AddTimeFree {
string token = 1;
uint32 duration = 2;
uint64 endTimestamp = 3;
string channelId = 4;
uint32 senderAgoraId = 5;
}
/* id == 105 退出 */
message ConnectsQuit {
uint64 from_user_id = 1;
}
/* id == 106 连接状态 */
message ConnectStatus {
uint64 from_user_id = 1;
float user_diamonds = 2;
bool diamonds_enough = 3;
}
/* id == 107 ??? */
message ConnectsCall {
uint64 from_user_id = 1;
string rong_room_name = 2;
bool is_join = 3;
}
/* id == 108 */
message ConnectCommon {
string rong_room_name = 1;
uint64 from_user_id = 2;
string extra = 3;
string message = 4;
}
/* id == 109 召回授权弹框 */
message RecallWindow {
}
/* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/
......@@ -165,17 +108,19 @@ message Video {
string sendUserId = 6;
string receiveUserId = 7;
uint32 status = 8;
uint32 diamondBalance = 9;
User sendUser = 10;
}
/* id == 109 召回授权弹框 */
message RecallWindow {
}
/* id == 111 视频通话准备 */
message VideoCallReady {
uint64 startTimestamp = 1;
uint64 endTimestamp = 2;
uint64 callDuration = 3;
string channelId = 4;
uint64 remainDiamond = 5;
}
/* id == 112 互相喜欢 */
......@@ -213,6 +158,7 @@ message GlobalGiftBanner {
uint32 bannerType = 14; // 类型:0.普通礼物 1.cp直接送礼 2.cp告白礼物
uint32 cpLevel = 15; // cp等级
string receiveUserAvatar = 16;
uint32 nobleLevel = 17; // 贵族等级
}
/* id == 116 横幅的回应,用来测量RTT */
......@@ -289,6 +235,7 @@ message GlobalBroadcast {
string msg = 6;
string groupId = 7;
uint32 senderNobleLevel = 8;
bool isPinned = 9;
}
/* id == 124 全球消息 */
......@@ -322,13 +269,11 @@ message VideoTimeMinuteSuccess {
uint32 senderAgoraId = 5;
string videoUniqueId = 6;
bool isSend = 7;
uint32 sendRemainDiamond = 8;
}
/* id == 129 1对1视频1分钟加时询问检查 */
message VideoTimeMinuteCheck {
string videoUniqueId = 1;
uint32 diamond = 2;
string uuid = 3;
}
......@@ -369,6 +314,7 @@ message GlobalGameBanner {
uint64 diamond = 4;
string bannerUrl = 5;
uint64 gameId = 6; // 1.ludo 2.uno 3.dice 4.lucky wheel 5.lucky box 6.fruit 7.slot
bool isPink = 7; // 是否粉钻
}
/* id == 147 羊羊匹配成功 */
......@@ -377,6 +323,11 @@ message SheepMatchSuccess {
User user = 2;
User otherUser = 3;
uint64 game_id = 4;
string channelId = 5;
string token = 6;
uint32 agoraId = 7;
uint32 provider = 8;
uint32 otherAgoraId = 9;
}
message SheepGamePlayer {
......@@ -463,4 +414,34 @@ message MicUserData {
string micEffect = 14;
string headwearIcon = 15;
Svip svip = 16;
}
/* id == 157 游戏大厅匹配成功 */
message LobbyMatchSuccess {
uint64 game_id = 1;
uint64 mode = 2;
string group_id = 3;
User user = 4;
User otherUser = 5;
string gameCode = 6;
}
/* id == 158 H5游戏静音 */
message H5GameVoiceMute {
}
/* id == 159 H5游戏打开语音 */
message H5GameVoiceUnMute {
}
/* id == 160 退出房间 */
message QuitRoom {
uint32 reason = 1; // 原因1.被拉黑;2.被踢出
string group_id = 2;
}
/* id == 161 国家管理员横幅 */
message GlobalCountryMgrBanner {
string country = 1; // 国家
User user = 2; // 用户信息
}
\ No newline at end of file