diff --git a/main.go b/main.go index 5e2a97ae9fb87ff54be6f095de5c8fccc728ab4f..4913aace7056b7d3998362b1465a689179ca94f8 100644 --- a/main.go +++ b/main.go @@ -40,12 +40,14 @@ const ( kickChanSize = 500 broadcastChanSize = 3500 areacastChanSize = 3500 + levelcastChanSize = 3500 ) var ( kickChan chan KickChanMsg broadcastChan chan BroadcastChanMsg areacastChan chan AreaChanMsg + levelcastChan chan LevelChanMsg ) type KickChanMsg struct { @@ -65,6 +67,12 @@ type AreaChanMsg struct { in *userCenter.AreaMessage } +type LevelChanMsg struct { + ProxyAddr string + UserIds []uint64 + in *userCenter.LevelMessage +} + var ( userManager *manager.UserManager = nil termManager *manager.TerminalManager = nil @@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use return &userCenter.AreaMessageRsp{FailedUids: failed}, nil } +func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*userCenter.LevelMessageRsp, error) { + var failed []uint64 + terminals := termManager.GetAll() + if terminals != nil { + var uids []uint64 + for uidStr := range *terminals { + if uid, _ := strconv.ParseUint(uidStr, 10, 64); uid > 0 { + uids = append(uids, uid) + } + } + // 处理分区用户 + levelUserIds := userManager.GetLevelUsers(uids, in.Level) + if len(levelUserIds) <= 0 { + return &userCenter.LevelMessageRsp{FailedUids: failed}, nil + } + m := make(map[string][]uint64, 0) + for uid := range levelUserIds { + ok := false + addr := userManager.GetUser(uid) + if addr != nil { + if _, ok := m[*addr]; !ok { + m[*addr] = make([]uint64, 0) + } + m[*addr] = append(m[*addr], uid) + ok = true + } else { + mylogrus.MyLog.Errorf("Unknown user %d\n", uid) + } + if !ok { + failed = append(failed, uid) + } + } + for addr, users := range m { + const sendBatchSize = 5 + for i := 0; i < len(users); i += sendBatchSize { + end := i + sendBatchSize + if end > len(users) { + end = len(users) + } + levelcastChan <- LevelChanMsg{ + ProxyAddr: addr, + UserIds: users[i:end], + in: in, + } + } + } + } + return &userCenter.LevelMessageRsp{FailedUids: failed}, nil +} + func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) { mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad) @@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) { } } +func realLevelcast(addr string, uids []uint64, msg *userCenter.LevelMessage) { + for _, uid := range uids { + client := manager.UserProxyMgr.MakeClient(addr) + if client == nil { + mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr) + } else { + + toRouterClient := userCenter.NewRouterClient(client) + status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad) + if err != nil { + mylogrus.MyLog.Errorf("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err) + } + } + } +} + func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() @@ -613,6 +687,7 @@ func main() { kickChan = make(chan KickChanMsg, kickChanSize) broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize) areacastChan = make(chan AreaChanMsg, areacastChanSize) + levelcastChan = make(chan LevelChanMsg, levelcastChanSize) go check() // 检查长度 for i := 0; i < kickChanSize; i++ { go func(n int) { @@ -629,6 +704,11 @@ func main() { areacast(n) }(i) } + for i := 0; i < levelcastChanSize; i++ { + go func(n int) { + levelcast(n) + }(i) + } fmt.Println("Go RPC listening on ", port) lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) @@ -677,6 +757,12 @@ func areacast(n int) { } } +func levelcast(n int) { + for msg := range levelcastChan { + realLevelcast(msg.ProxyAddr, msg.UserIds, msg.in) + } +} + var lastDingTime time.Time var dingIntervalMin float64 = 5 // 5min 告警间隔 @@ -687,11 +773,12 @@ func check() { for { select { case <-tick.C: - l, l2, l3 := len(kickChan), len(broadcastChan), len(areacastChan) - if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength { + l, l2, l3, l4 := len(kickChan), len(broadcastChan), len(areacastChan), len(levelcastChan) + if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength || l4 >= monitorLength { if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin { go func() { - if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d", l, l2, l3), true); sErr != nil { + if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d", + l, l2, l3, l4), true); sErr != nil { mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) } else { lastDingTime = time.Now() diff --git a/manager/userManager.go b/manager/userManager.go index 616fb2a07c873e152a26ee9cc28635a8d4d553c8..464c3acacadfa2e65691cb4e4af09bc110bfa304 100644 --- a/manager/userManager.go +++ b/manager/userManager.go @@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT } return res } + +// 获取财富等级大于某等级的用户 +// 开区间 +func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea { + res := make(map[uint64]UserTinyArea) + // 从db中读,暂时不缓存(几千个) + var users []UserTinyArea + if err := m.MysqlDB.Table("user").Joins("match_wealth_user_score ON match_wealth_user_score.user_id = user.id"). + Select("user.id,external_id,sex,code,country,avatar"). + Where("user.id IN (?)", userIds). + Where("match_wealth_user_score.grade > ?", wealthLevel). + Find(&users).Error; err != nil { + mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err) + return res + } + for _, u := range users { + a := m.GetArea(u.Country) + res[u.ID] = UserTinyArea{ + ID: u.ID, + ExternalId: u.ExternalId, + Sex: u.Sex, + Code: u.Code, + Country: u.Country, + Area: a, + Avatar: u.Avatar, + } + } + return res +} diff --git a/protocol/userCenter.proto b/protocol/userCenter.proto index c20e730d6f4ec753647f60d7910c1205205b7e76..207a5d2cae91b7540dd4082b6941b4a171e5e550 100644 --- a/protocol/userCenter.proto +++ b/protocol/userCenter.proto @@ -132,6 +132,16 @@ message AreaMessageRsp { repeated uint64 failedUids = 1; } +message LevelMessage { + int32 level = 1; + uint32 msgType = 2; + bytes payLoad = 3; +} + +message LevelMessageRsp { + repeated uint64 failedUids = 1; +} + service Router { rpc route(RouteMessage) returns (RouteMessageRsp) {} rpc kickUser(KickMessage) returns (KickMessageRsp) {} @@ -143,6 +153,7 @@ service User { rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {} rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {} rpc areacast(AreaMessage) returns (AreaMessageRsp) {} + rpc levelcast(LevelMessage) returns (LevelMessageRsp) {} rpc transmit(BizMessage) returns (BizMessageRsp) {} rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {} rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {}