Commit a5ba9f6a authored by hujiebin's avatar hujiebin

feat:levelCast

parent 6b9e9fe0
...@@ -40,12 +40,14 @@ const ( ...@@ -40,12 +40,14 @@ const (
kickChanSize = 500 kickChanSize = 500
broadcastChanSize = 3500 broadcastChanSize = 3500
areacastChanSize = 3500 areacastChanSize = 3500
levelcastChanSize = 3500
) )
var ( var (
kickChan chan KickChanMsg kickChan chan KickChanMsg
broadcastChan chan BroadcastChanMsg broadcastChan chan BroadcastChanMsg
areacastChan chan AreaChanMsg areacastChan chan AreaChanMsg
levelcastChan chan LevelChanMsg
) )
type KickChanMsg struct { type KickChanMsg struct {
...@@ -65,6 +67,12 @@ type AreaChanMsg struct { ...@@ -65,6 +67,12 @@ type AreaChanMsg struct {
in *userCenter.AreaMessage in *userCenter.AreaMessage
} }
type LevelChanMsg struct {
ProxyAddr string
UserIds []uint64
in *userCenter.LevelMessage
}
var ( var (
userManager *manager.UserManager = nil userManager *manager.UserManager = nil
termManager *manager.TerminalManager = nil termManager *manager.TerminalManager = nil
...@@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use ...@@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use
return &userCenter.AreaMessageRsp{FailedUids: failed}, nil return &userCenter.AreaMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*userCenter.LevelMessageRsp, error) {
var failed []uint64
terminals := termManager.GetAll()
if terminals != nil {
var uids []uint64
for uidStr := range *terminals {
if uid, _ := strconv.ParseUint(uidStr, 10, 64); uid > 0 {
uids = append(uids, uid)
}
}
// 处理分区用户
levelUserIds := userManager.GetLevelUsers(uids, in.Level)
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
m := make(map[string][]uint64, 0)
for uid := range levelUserIds {
ok := false
addr := userManager.GetUser(uid)
if addr != nil {
if _, ok := m[*addr]; !ok {
m[*addr] = make([]uint64, 0)
}
m[*addr] = append(m[*addr], uid)
ok = true
} else {
mylogrus.MyLog.Errorf("Unknown user %d\n", uid)
}
if !ok {
failed = append(failed, uid)
}
}
for addr, users := range m {
const sendBatchSize = 5
for i := 0; i < len(users); i += sendBatchSize {
end := i + sendBatchSize
if end > len(users) {
end = len(users)
}
levelcastChan <- LevelChanMsg{
ProxyAddr: addr,
UserIds: users[i:end],
in: in,
}
}
}
}
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) { func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) {
mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad) mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad)
...@@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) { ...@@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) {
} }
} }
func realLevelcast(addr string, uids []uint64, msg *userCenter.LevelMessage) {
for _, uid := range uids {
client := manager.UserProxyMgr.MakeClient(addr)
if client == nil {
mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr)
} else {
toRouterClient := userCenter.NewRouterClient(client)
status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad)
if err != nil {
mylogrus.MyLog.Errorf("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err)
}
}
}
}
func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) { func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
...@@ -613,6 +687,7 @@ func main() { ...@@ -613,6 +687,7 @@ func main() {
kickChan = make(chan KickChanMsg, kickChanSize) kickChan = make(chan KickChanMsg, kickChanSize)
broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize) broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize)
areacastChan = make(chan AreaChanMsg, areacastChanSize) areacastChan = make(chan AreaChanMsg, areacastChanSize)
levelcastChan = make(chan LevelChanMsg, levelcastChanSize)
go check() // 检查长度 go check() // 检查长度
for i := 0; i < kickChanSize; i++ { for i := 0; i < kickChanSize; i++ {
go func(n int) { go func(n int) {
...@@ -629,6 +704,11 @@ func main() { ...@@ -629,6 +704,11 @@ func main() {
areacast(n) areacast(n)
}(i) }(i)
} }
for i := 0; i < levelcastChanSize; i++ {
go func(n int) {
levelcast(n)
}(i)
}
fmt.Println("Go RPC listening on ", port) fmt.Println("Go RPC listening on ", port)
lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port))
...@@ -677,6 +757,12 @@ func areacast(n int) { ...@@ -677,6 +757,12 @@ func areacast(n int) {
} }
} }
func levelcast(n int) {
for msg := range levelcastChan {
realLevelcast(msg.ProxyAddr, msg.UserIds, msg.in)
}
}
var lastDingTime time.Time var lastDingTime time.Time
var dingIntervalMin float64 = 5 // 5min 告警间隔 var dingIntervalMin float64 = 5 // 5min 告警间隔
...@@ -687,11 +773,12 @@ func check() { ...@@ -687,11 +773,12 @@ func check() {
for { for {
select { select {
case <-tick.C: case <-tick.C:
l, l2, l3 := len(kickChan), len(broadcastChan), len(areacastChan) l, l2, l3, l4 := len(kickChan), len(broadcastChan), len(areacastChan), len(levelcastChan)
if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength { if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength || l4 >= monitorLength {
if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin { if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin {
go func() { go func() {
if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d", l, l2, l3), true); sErr != nil { if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d",
l, l2, l3, l4), true); sErr != nil {
mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr)
} else { } else {
lastDingTime = time.Now() lastDingTime = time.Now()
......
...@@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT ...@@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
} }
return res return res
} }
// 获取财富等级大于某等级的用户
// 开区间
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea {
res := make(map[uint64]UserTinyArea)
// 从db中读,暂时不缓存(几千个)
var users []UserTinyArea
if err := m.MysqlDB.Table("user").Joins("match_wealth_user_score ON match_wealth_user_score.user_id = user.id").
Select("user.id,external_id,sex,code,country,avatar").
Where("user.id IN (?)", userIds).
Where("match_wealth_user_score.grade > ?", wealthLevel).
Find(&users).Error; err != nil {
mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err)
return res
}
for _, u := range users {
a := m.GetArea(u.Country)
res[u.ID] = UserTinyArea{
ID: u.ID,
ExternalId: u.ExternalId,
Sex: u.Sex,
Code: u.Code,
Country: u.Country,
Area: a,
Avatar: u.Avatar,
}
}
return res
}
...@@ -132,6 +132,16 @@ message AreaMessageRsp { ...@@ -132,6 +132,16 @@ message AreaMessageRsp {
repeated uint64 failedUids = 1; repeated uint64 failedUids = 1;
} }
message LevelMessage {
int32 level = 1;
uint32 msgType = 2;
bytes payLoad = 3;
}
message LevelMessageRsp {
repeated uint64 failedUids = 1;
}
service Router { service Router {
rpc route(RouteMessage) returns (RouteMessageRsp) {} rpc route(RouteMessage) returns (RouteMessageRsp) {}
rpc kickUser(KickMessage) returns (KickMessageRsp) {} rpc kickUser(KickMessage) returns (KickMessageRsp) {}
...@@ -143,6 +153,7 @@ service User { ...@@ -143,6 +153,7 @@ service User {
rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {} rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {}
rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {} rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {}
rpc areacast(AreaMessage) returns (AreaMessageRsp) {} rpc areacast(AreaMessage) returns (AreaMessageRsp) {}
rpc levelcast(LevelMessage) returns (LevelMessageRsp) {}
rpc transmit(BizMessage) returns (BizMessageRsp) {} rpc transmit(BizMessage) returns (BizMessageRsp) {}
rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {} rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {}
rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {} rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment