diff --git a/common/mylogrus/log.go b/common/mylogrus/log.go index 7f19814c314c41e06deb6b690cccdb0243224f05..7901970fb9f91f41851493ade60018a8f4ae98bb 100644 --- a/common/mylogrus/log.go +++ b/common/mylogrus/log.go @@ -24,7 +24,7 @@ func init() { filenamePrefix = logDir + filepath.Base(os.Args[0]) + "." // stderr日志重定向 MyLog.SetOutput(os.Stdout) - RewriteStderrFile() + //RewriteStderrFile() if config.AppIsRelease() { MyLog.SetFormatter(&logrus.JSONFormatter{ diff --git a/main.go b/main.go index 873770eb5e3599adb09d420347bd4d7d30541545..513bcc8a8afe5d1dbd34e4ee43170d73b0826073 100644 --- a/main.go +++ b/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "errors" "flag" "fmt" + "github.com/golang/protobuf/proto" "gorm.io/gorm/schema" "net" "net/url" @@ -89,7 +91,7 @@ var kasp = keepalive.ServerParameters{ } func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) { - mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr) + //mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr) var loginStatus uint32 = common.Login_success claim, err := common.ParseToken(in.GetToken()) @@ -106,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC // FIXME: 发现用户已经登录,要踢走旧连接 proxyAddr := userManager.GetUser(claim.UserId) if proxyAddr != nil { - mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr) + //mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr) kickChan <- KickChanMsg{ userId: claim.UserId, proxyAddr: *proxyAddr, @@ -127,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC } else { mylogrus.MyLog.Errorf("wrong user %d", claim.UserId) } - mylogrus.MyLog.Infof("adding user %d", claim.UserId) + //mylogrus.MyLog.Infof("adding user %d", claim.UserId) // save to redis userManager.AddUser(claim.UserId, in.ProxyAddr) @@ -190,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage) failed = append(failed, uid) } } - if len(failed) > 0 { - mylogrus.MyLog.Infof("Multicast failed for %v\n", failed) - } + //if len(failed) > 0 { + //mylogrus.MyLog.Infof("Multicast failed for %v\n", failed) + //} return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil } -func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { +func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { //mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad)) failed := []uint64{} @@ -249,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil } +func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { + redisKey := "service:userSocket" + ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{ + Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳 + Max: "+inf", + }).Result() + if err != nil { + failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err) + mylogrus.MyLog.Errorf(failMsg) + _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true) + return nil, err + } + if len(ipPorts) <= 0 { + failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err) + mylogrus.MyLog.Errorf(failMsg) + _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true) + return nil, errors.New(failMsg) + } + data, _ := proto.Marshal(in) + for _, ip := range ipPorts { + queue := "broadcast:" + ip + rdbCluster.RPush(context.Background(), queue, data) + } + return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil +} + func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) { var failed []uint64 terminals := termManager.GetAll() @@ -496,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint } func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error { - mylogrus.MyLog.Infof("sendKickMessage %s", msg.String()) + //mylogrus.MyLog.Infof("sendKickMessage %s", msg.String()) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() r, err := c.KickUser(ctx, msg) @@ -558,11 +586,13 @@ type HiloConfigs struct { Value string } +var rdbCluster *redis.Client + func main() { flag.Parse() // init redis cluster - rdbCluster := redis.NewClient(&redis.Options{ + rdbCluster = redis.NewClient(&redis.Options{ Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST, Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD, }) @@ -637,8 +667,8 @@ func main() { for { select { - case a := <-ticker.C: - mylogrus.MyLog.Infof("Tick at %s", a.String()) + case <-ticker.C: + //mylogrus.MyLog.Infof("Tick at %s", a.String()) terminals := termManager.GetAll() if terminals != nil { if len(*terminals) <= 100 { @@ -693,10 +723,10 @@ func main() { func kick(n int) { for msg := range kickChan { - mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg) + //mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg) clientAddr := termManager.GetTerminal(msg.userId) if clientAddr == nil { - mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId) + //mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId) } else { client := manager.UserProxyMgr.GetClient(msg.proxyAddr) if client == nil {