Commit bd9e98d5 authored by hujiebin's avatar hujiebin

Merge branch 'feature/hilo-socket' into test

parents 89821bdb ef02152e
......@@ -24,7 +24,7 @@ func init() {
filenamePrefix = logDir + filepath.Base(os.Args[0]) + "."
// stderr日志重定向
MyLog.SetOutput(os.Stdout)
RewriteStderrFile()
//RewriteStderrFile()
if config.AppIsRelease() {
MyLog.SetFormatter(&logrus.JSONFormatter{
......
......@@ -2,8 +2,10 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"github.com/golang/protobuf/proto"
"gorm.io/gorm/schema"
"net"
"net/url"
......@@ -89,7 +91,7 @@ var kasp = keepalive.ServerParameters{
}
func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) {
mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
//mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
var loginStatus uint32 = common.Login_success
claim, err := common.ParseToken(in.GetToken())
......@@ -106,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
// FIXME: 发现用户已经登录,要踢走旧连接
proxyAddr := userManager.GetUser(claim.UserId)
if proxyAddr != nil {
mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
//mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
kickChan <- KickChanMsg{
userId: claim.UserId,
proxyAddr: *proxyAddr,
......@@ -127,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
} else {
mylogrus.MyLog.Errorf("wrong user %d", claim.UserId)
}
mylogrus.MyLog.Infof("adding user %d", claim.UserId)
//mylogrus.MyLog.Infof("adding user %d", claim.UserId)
// save to redis
userManager.AddUser(claim.UserId, in.ProxyAddr)
......@@ -190,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage)
failed = append(failed, uid)
}
}
if len(failed) > 0 {
mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
}
//if len(failed) > 0 {
//mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
//}
return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil
}
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
//mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad))
failed := []uint64{}
......@@ -249,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage)
return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil
}
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
redisKey := "service:userSocket"
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, err
}
if len(ipPorts) <= 0 {
failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, errors.New(failMsg)
}
data, _ := proto.Marshal(in)
for _, ip := range ipPorts {
queue := "broadcast:" + ip
rdbCluster.RPush(context.Background(), queue, data)
}
return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil
}
func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) {
var failed []uint64
terminals := termManager.GetAll()
......@@ -496,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint
}
func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error {
mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
//mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := c.KickUser(ctx, msg)
......@@ -558,11 +586,13 @@ type HiloConfigs struct {
Value string
}
var rdbCluster *redis.Client
func main() {
flag.Parse()
// init redis cluster
rdbCluster := redis.NewClient(&redis.Options{
rdbCluster = redis.NewClient(&redis.Options{
Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST,
Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD,
})
......@@ -637,8 +667,8 @@ func main() {
for {
select {
case a := <-ticker.C:
mylogrus.MyLog.Infof("Tick at %s", a.String())
case <-ticker.C:
//mylogrus.MyLog.Infof("Tick at %s", a.String())
terminals := termManager.GetAll()
if terminals != nil {
if len(*terminals) <= 100 {
......@@ -693,10 +723,10 @@ func main() {
func kick(n int) {
for msg := range kickChan {
mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
//mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
clientAddr := termManager.GetTerminal(msg.userId)
if clientAddr == nil {
mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
//mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
} else {
client := manager.UserProxyMgr.GetClient(msg.proxyAddr)
if client == nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment