...
 
Commits (9)
...@@ -24,7 +24,7 @@ func init() { ...@@ -24,7 +24,7 @@ func init() {
filenamePrefix = logDir + filepath.Base(os.Args[0]) + "." filenamePrefix = logDir + filepath.Base(os.Args[0]) + "."
// stderr日志重定向 // stderr日志重定向
MyLog.SetOutput(os.Stdout) MyLog.SetOutput(os.Stdout)
RewriteStderrFile() //RewriteStderrFile()
if config.AppIsRelease() { if config.AppIsRelease() {
MyLog.SetFormatter(&logrus.JSONFormatter{ MyLog.SetFormatter(&logrus.JSONFormatter{
......
...@@ -9,9 +9,9 @@ MYSQL_USERNAME=hilo_test ...@@ -9,9 +9,9 @@ MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code MYSQL_DB=hilo_code
[REDIS] [REDIS]
REDIS_HOST=172.19.0.2:6379 REDIS_HOST=43.135.4.137:6379
REDIS_PASSWORD=yPyZH1DYMJhrVQgr REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=172.19.0.2:6379 REDIS_CLUSTER_HOST=43.135.4.137:6379
REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT] [JWT]
SECRET=hilo1632 SECRET=hilo1632
......
...@@ -2,8 +2,10 @@ package main ...@@ -2,8 +2,10 @@ package main
import ( import (
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"gorm.io/gorm/schema" "gorm.io/gorm/schema"
"net" "net"
"net/url" "net/url"
...@@ -89,7 +91,7 @@ var kasp = keepalive.ServerParameters{ ...@@ -89,7 +91,7 @@ var kasp = keepalive.ServerParameters{
} }
func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) { func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) {
mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr) //mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
var loginStatus uint32 = common.Login_success var loginStatus uint32 = common.Login_success
claim, err := common.ParseToken(in.GetToken()) claim, err := common.ParseToken(in.GetToken())
...@@ -106,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC ...@@ -106,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
// FIXME: 发现用户已经登录,要踢走旧连接 // FIXME: 发现用户已经登录,要踢走旧连接
proxyAddr := userManager.GetUser(claim.UserId) proxyAddr := userManager.GetUser(claim.UserId)
if proxyAddr != nil { if proxyAddr != nil {
mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr) //mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
kickChan <- KickChanMsg{ kickChan <- KickChanMsg{
userId: claim.UserId, userId: claim.UserId,
proxyAddr: *proxyAddr, proxyAddr: *proxyAddr,
...@@ -127,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC ...@@ -127,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
} else { } else {
mylogrus.MyLog.Errorf("wrong user %d", claim.UserId) mylogrus.MyLog.Errorf("wrong user %d", claim.UserId)
} }
mylogrus.MyLog.Infof("adding user %d", claim.UserId) //mylogrus.MyLog.Infof("adding user %d", claim.UserId)
// save to redis // save to redis
userManager.AddUser(claim.UserId, in.ProxyAddr) userManager.AddUser(claim.UserId, in.ProxyAddr)
...@@ -190,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage) ...@@ -190,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage)
failed = append(failed, uid) failed = append(failed, uid)
} }
} }
if len(failed) > 0 { //if len(failed) > 0 {
mylogrus.MyLog.Infof("Multicast failed for %v\n", failed) //mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
} //}
return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
//mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad)) //mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad))
failed := []uint64{} failed := []uint64{}
...@@ -249,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) ...@@ -249,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage)
return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
redisKey := "service:userSocket"
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, err
}
if len(ipPorts) <= 0 {
failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, errors.New(failMsg)
}
data, _ := proto.Marshal(in)
for _, ip := range ipPorts {
queue := "broadcast:" + ip
rdbCluster.RPush(context.Background(), queue, data)
}
return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil
}
func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) { func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) {
var failed []uint64 var failed []uint64
terminals := termManager.GetAll() terminals := termManager.GetAll()
...@@ -309,11 +337,18 @@ func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*u ...@@ -309,11 +337,18 @@ func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*u
uids = append(uids, uid) uids = append(uids, uid)
} }
} }
// 处理分区用户 // 处理等级用户
levelUserIds := userManager.GetLevelUsers(uids, in.Level) levelUserIds, userIds := userManager.GetLevelUsers(uids, in.Level)
if len(levelUserIds) <= 0 { if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
} }
if in.Area > 0 {
// 处理分区用户
levelUserIds = userManager.GetAreaUsers(userIds, int8(in.Area))
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
}
m := make(map[string][]uint64, 0) m := make(map[string][]uint64, 0)
for uid := range levelUserIds { for uid := range levelUserIds {
ok := false ok := false
...@@ -489,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint ...@@ -489,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint
} }
func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error { func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error {
mylogrus.MyLog.Infof("sendKickMessage %s", msg.String()) //mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
r, err := c.KickUser(ctx, msg) r, err := c.KickUser(ctx, msg)
...@@ -551,11 +586,13 @@ type HiloConfigs struct { ...@@ -551,11 +586,13 @@ type HiloConfigs struct {
Value string Value string
} }
var rdbCluster *redis.Client
func main() { func main() {
flag.Parse() flag.Parse()
// init redis cluster // init redis cluster
rdbCluster := redis.NewClient(&redis.Options{ rdbCluster = redis.NewClient(&redis.Options{
Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST, Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST,
Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD, Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD,
}) })
...@@ -630,8 +667,8 @@ func main() { ...@@ -630,8 +667,8 @@ func main() {
for { for {
select { select {
case a := <-ticker.C: case <-ticker.C:
mylogrus.MyLog.Infof("Tick at %s", a.String()) //mylogrus.MyLog.Infof("Tick at %s", a.String())
terminals := termManager.GetAll() terminals := termManager.GetAll()
if terminals != nil { if terminals != nil {
if len(*terminals) <= 100 { if len(*terminals) <= 100 {
...@@ -686,10 +723,10 @@ func main() { ...@@ -686,10 +723,10 @@ func main() {
func kick(n int) { func kick(n int) {
for msg := range kickChan { for msg := range kickChan {
mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg) //mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
clientAddr := termManager.GetTerminal(msg.userId) clientAddr := termManager.GetTerminal(msg.userId)
if clientAddr == nil { if clientAddr == nil {
mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId) //mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
} else { } else {
client := manager.UserProxyMgr.GetClient(msg.proxyAddr) client := manager.UserProxyMgr.GetClient(msg.proxyAddr)
if client == nil { if client == nil {
......
...@@ -138,8 +138,9 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT ...@@ -138,8 +138,9 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
// 获取财富等级大于某等级的用户 // 获取财富等级大于某等级的用户
// 开区间 // 开区间
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea { func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) (map[uint64]UserTinyArea, []uint64) {
res := make(map[uint64]UserTinyArea) res := make(map[uint64]UserTinyArea)
var resIds []uint64
// 从db中读,暂时不缓存(几千个) // 从db中读,暂时不缓存(几千个)
var users []UserTinyArea var users []UserTinyArea
if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id"). if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id").
...@@ -148,7 +149,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin ...@@ -148,7 +149,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin
Where("match_wealth_user_score.grade > ?", wealthLevel). Where("match_wealth_user_score.grade > ?", wealthLevel).
Find(&users).Error; err != nil { Find(&users).Error; err != nil {
mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err) mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err)
return res return res, resIds
} }
for _, u := range users { for _, u := range users {
a := m.GetArea(u.Country) a := m.GetArea(u.Country)
...@@ -161,6 +162,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin ...@@ -161,6 +162,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin
Area: a, Area: a,
Avatar: u.Avatar, Avatar: u.Avatar,
} }
resIds = append(resIds, u.ID)
} }
return res return res, resIds
} }
...@@ -136,6 +136,7 @@ message LevelMessage { ...@@ -136,6 +136,7 @@ message LevelMessage {
int32 level = 1; int32 level = 1;
uint32 msgType = 2; uint32 msgType = 2;
bytes payLoad = 3; bytes payLoad = 3;
int32 area = 4;
} }
message LevelMessageRsp { message LevelMessageRsp {
......