package main import ( "context" "flag" "fmt" "net" "net/http" "net/url" "os" "strconv" "strings" "time" "github.com/go-redis/redis/v8" consulapi "github.com/hashicorp/consul/api" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" "hilo-userCenter/common" "hilo-userCenter/common/dingding" "hilo-userCenter/common/mylogrus" "hilo-userCenter/manager" "hilo-userCenter/protocol/biz" "hilo-userCenter/protocol/userCenter" ) const ( port = 50040 default_redis_address = "47.244.34.27:6379" default_redis_password = "8QZ9JD1zLvPR3yHf" redis_section = 1 ) // 控制异步消息协程 const ( monitorLength = 1000 // 队列告警数量 kickChanSize = 500 broadcastChanSize = 3500 ) var ( kickChan chan KickChanMsg broadcastChan chan BroadcastChanMsg ) type KickChanMsg struct { userId uint64 proxyAddr string } type BroadcastChanMsg struct { ProxyAddr string UserIds []uint64 in *userCenter.BroadcastMessage } var ( userManager *manager.UserManager = nil termManager *manager.TerminalManager = nil roomManager *manager.RoomManager = nil ) var logDir = "/var/log/hilo/" type server struct { userCenter.UnimplementedUserServer } var kasp = keepalive.ServerParameters{ MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead } func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) { mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr) var loginStatus uint32 = common.Login_success claim, err := common.ParseToken(in.GetToken()) if err != nil { mylogrus.MyLog.Errorf("Invalid token %s\n", in.GetToken()) loginStatus = common.Login_valid_token } else if time.Now().Unix() > claim.ExpiresAt { loginStatus = common.Login_token_expired } if loginStatus != common.Login_success || claim == nil { return &userCenter.LoginMessageRsp{Status: loginStatus, Uid: 0}, nil } else { // FIXME: 发现用户已经登录,要踢走旧连接 proxyAddr := userManager.GetUser(claim.UserId) if proxyAddr != nil { mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr) kickChan <- KickChanMsg{ userId: claim.UserId, proxyAddr: *proxyAddr, } //clientAddr := termManager.GetTerminal(claim.UserId) //if clientAddr == nil { // mylogrus.MyLog.Infof("No terminal found for %d", claim.UserId) //} else { // client := manager.UserProxyMgr.GetClient(*proxyAddr) // if client == nil { // mylogrus.MyLog.Infof("No userProxy found for %d, %s\n", claim.UserId, *proxyAddr) // } else { // toRouterClient := userCenter.NewRouterClient(client) // msg := &userCenter.KickMessage{Uid: claim.UserId, Addr: *clientAddr} // go sendKickMessage(toRouterClient, msg) // } //} } else { mylogrus.MyLog.Errorf("wrong user %d", claim.UserId) } mylogrus.MyLog.Infof("adding user %d", claim.UserId) // save to redis userManager.AddUser(claim.UserId, in.ProxyAddr) termManager.SetTerminal(claim.UserId, in.ClientAddr) // 为登录用户建立反向连接,如果需要的话 go manager.UserProxyMgr.MakeClient(in.ProxyAddr) } return &userCenter.LoginMessageRsp{Status: loginStatus, Uid: claim.UserId}, nil } func (s *server) Logout(ctx context.Context, in *userCenter.LogoutMessage) (*userCenter.LogoutMessageRsp, error) { mylogrus.MyLog.Infof("Received logoutMsg: %s, %d\n", in.GetClientAddr(), in.GetUid()) addr := termManager.GetTerminal(in.Uid) if addr != nil && *addr == in.ClientAddr { termManager.RemoveTerminal(in.Uid) } // 去掉队列push //msg := protocol.LogoutMsg{ // UserId: in.Uid, // Timestamp: time.Now().Unix(), //} //buf, err := json.Marshal(msg) //if err == nil { //r, err := termManager.RedisClient.RPush(context.Background(), protocol.LogoutMsgQueue, string(buf)).Result() //if err == nil { // mylogrus.MyLog.Infof("RPush OK, length = %v ", r) //} else { // mylogrus.MyLog.Infof("RPush failed %v", err) //} //} return &userCenter.LogoutMessageRsp{Status: 0}, nil } func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage) (*userCenter.MulticastMessageRsp, error) { mylogrus.MyLog.Infof("Multicasting msgType = %d to %v, size = %d\n", in.MsgType, in.Uids, len(in.PayLoad)) failed := []uint64{} for _, uid := range in.Uids { ok := false addr := userManager.GetUser(uid) if addr == nil { mylogrus.MyLog.Errorf("Unknown user %d\n", uid) } else { client := manager.UserProxyMgr.MakeClient(*addr) if client == nil { mylogrus.MyLog.Infof("Failed in making client for %d, %s\n", uid, *addr) } else { toRouterClient := userCenter.NewRouterClient(client) status, err := routeMessage(toRouterClient, uid, in.MsgType, in.PayLoad) if err == nil && status == common.ROUTE_SUCCESS { ok = true } } } if !ok { failed = append(failed, uid) } } if len(failed) > 0 { mylogrus.MyLog.Infof("Multicast failed for %v\n", failed) } return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil } func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad)) failed := []uint64{} terminals := termManager.GetAll() if terminals != nil { m := make(map[string][]uint64, 0) for u, _ := range *terminals { uid, err := strconv.ParseUint(u, 10, 64) ok := false if err == nil { addr := userManager.GetUser(uid) if addr != nil { if _, ok := m[*addr]; !ok { m[*addr] = make([]uint64, 0) } m[*addr] = append(m[*addr], uid) ok = true } else { mylogrus.MyLog.Errorf("Unknown user %d\n", uid) } } else { mylogrus.MyLog.Infof("Invalid user str: %s\n", u) } if !ok { failed = append(failed, uid) } } for addr, users := range m { //addr = strings.Replace(addr, "47.91.121.73", "172.26.95.24", -1) mylogrus.MyLog.Infof("Broadcasting: Addr %s: %d users", addr, len(users)) if !strings.Contains(addr, "172.26.95.48:50050") && !strings.Contains(addr, "172.26.95.24:50050") { mylogrus.MyLog.Errorf("Broadcasting: Addr error %s: %d users", addr, len(users)) } const sendBatchSize = 5 for i := 0; i < len(users); i += sendBatchSize { end := i + sendBatchSize if end > len(users) { end = len(users) } broadcastChan <- BroadcastChanMsg{ ProxyAddr: addr, UserIds: users[i:end], in: in, } //go realBroadcast(addr, users[i:end], in) } } } return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil } func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) { mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad) // fixme: addr := "localhost:50060" rsp := &userCenter.BizMessageRsp{} client := manager.BizMgr.MakeClient(addr) if client == nil { mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", in.Uid, addr) } else { transmitterClient := biz.NewTransmitterClient(client) status, err := transmitMessage(transmitterClient, in.MsgType, in.PayLoad) mylogrus.MyLog.Infof("transmit uid = %d, msgType = %d, status = %d, %v", in.Uid, in.MsgType, status, err) } return rsp, nil } func (s *server) EnterRoom(ctx context.Context, in *userCenter.EnterRoomMessage) (*userCenter.EnterRoomMessageRsp, error) { if err := roomManager.AddRoomUser(in.GetUid(), in.GetGroupId()); err != nil { return nil, err } return &userCenter.EnterRoomMessageRsp{ Status: 0, }, nil } func (s *server) LeaveRoom(ctx context.Context, in *userCenter.LeaveRoomMessage) (*userCenter.LeaveRoomMessageRsp, error) { if err := roomManager.DelRoomUser(in.GetUid(), in.GetGroupId()); err != nil { return nil, err } return &userCenter.LeaveRoomMessageRsp{ Status: 0, }, nil } func (s *server) RoomHeartbeat(ctx context.Context, in *userCenter.RoomHeartbeatMessage) (*userCenter.RoomHeartbeatMessageRsp, error) { if err := roomManager.UpdateRoomUser(in.GetUid(), in.GetGroupId()); err != nil { return nil, err } return &userCenter.RoomHeartbeatMessageRsp{ Status: 0, }, nil } func realBroadcast(addr string, uids []uint64, msg *userCenter.BroadcastMessage) { mylogrus.MyLog.Infof("Broadcasting: Addr %s: users: %v", addr, uids) for _, uid := range uids { client := manager.UserProxyMgr.MakeClient(addr) if client == nil { mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr) } else { toRouterClient := userCenter.NewRouterClient(client) status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad) mylogrus.MyLog.Infof("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err) } } } func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() r, err := c.Route(ctx, &userCenter.RouteMessage{ Uid: uid, MsgType: msgType, PayLoad: data, }) if err != nil { mylogrus.MyLog.Errorf("Route message to user %d, err: %s\n", uid, err.Error()) } else if r != nil { mylogrus.MyLog.Infof("Route message to user %d, status = %d", uid, r.Status) return r.Status, err } return 0, err } func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint32, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() r, err := c.Process(ctx, &biz.BizMessage{ Type: msgType, PayLoad: data, }) if err != nil { mylogrus.MyLog.Errorf("Transmit message type %d, err: %s\n", msgType, err.Error()) } else if r != nil { mylogrus.MyLog.Infof("Transmit message type %d, status = %d", msgType, r.Status) return r.Status, err } return 0, err } func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error { mylogrus.MyLog.Infof("sendKickMessage %s", msg.String()) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() r, err := c.KickUser(ctx, msg) if err != nil && r != nil { mylogrus.MyLog.Errorf("sendKickMessage message status = %d", r.Status) } return err } func consulCheck(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "consulCheck") } const ( RegisterName = "userCenter" RegisterTag = "用户中心" ) func registerToConsul(client *consulapi.Client, retry bool) { checkPort := port + 1000 registration := new(consulapi.AgentServiceRegistration) hostName, _ := os.Hostname() registration.ID = fmt.Sprintf("%s-%s", RegisterName, hostName) registration.Name = RegisterName registration.Port = port registration.Tags = []string{RegisterTag} myIp, myNodeName := "", "" if localIp, err := common.GetClientIpV2(); err != nil { mylogrus.MyLog.Fatalln("local ip not found", err) } else { myIp = localIp } mylogrus.MyLog.Infof("My ip is %s, nodeName: %s\n", myIp, myNodeName) registration.Address = myIp registration.Check = &consulapi.AgentServiceCheck{ HTTP: fmt.Sprintf("http://localhost:%d%s", checkPort, "/check"), Timeout: "3s", Interval: "5s", DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务 } err := client.Agent().ServiceRegister(registration) if err != nil { mylogrus.MyLog.Fatal("register server error : ", err) } if !retry { http.HandleFunc("/check", consulCheck) http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil) } } // 自愈检查 // 启动后每一分钟检查一次 // 首次启动不执行 func selfCheck() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-ticker.C: client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数 if err != nil { mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err) break } if client == nil { mylogrus.MyLog.Errorf("Fail to get consul client.") break } cataLog := client.Catalog() if cataLog == nil { mylogrus.MyLog.Errorf("No catalog.") break } services, _, err := cataLog.Service(RegisterName, "", nil) if err != nil { mylogrus.MyLog.Errorf("%v", err) break } if len(services) == 0 { mylogrus.MyLog.Errorf("%s not found.", RegisterName) go registerToConsul(client, true) // 重新注册 } else { mylogrus.MyLog.Infof("%s check success %v", RegisterName, services[0]) } } } } type HiloConfig struct { Name string `gorm:"primary_key"` Value string } func main() { flag.Parse() config := consulapi.DefaultConfig() client, err := consulapi.NewClient(config) if err != nil { mylogrus.MyLog.Fatal("consul client error : ", err) } go registerToConsul(client, false) go selfCheck() redisAddress := default_redis_address redisPassword := default_redis_password mysqHost, mysqlUsername, mysqlPassword, mysqlDb := "47.244.34.27:3306", "root", "yX0jPAhO0I4s2zlA", "hilo" kv := client.KV() if kv != nil { p, _, err := kv.Get("redis_address", nil) if err == nil && p != nil { redisAddress = string(p.Value) } p, _, err = kv.Get("redis_password", nil) if err == nil && p != nil { redisPassword = string(p.Value) } p, _, err = kv.Get("mysql_host", nil) if err == nil && p != nil { mysqHost = string(p.Value) } p, _, err = kv.Get("mysql_username", nil) if err == nil && p != nil { mysqlUsername = string(p.Value) } p, _, err = kv.Get("mysql_password", nil) if err == nil && p != nil { mysqlPassword = string(p.Value) } p, _, err = kv.Get("mysql_db", nil) if err == nil && p != nil { mysqlDb = string(p.Value) } } mylogrus.MyLog.Infof("Db config mysqHost = %v , mysqlUsername = %v, mysqlPassword = %v ,mysqlDb = %v redisAddress :%v,redisPassword:%v", mysqHost, mysqlUsername, mysqlPassword, mysqlDb, redisAddress, redisPassword) if mysqHost == "" || mysqlUsername == "" || mysqlPassword == "" || mysqlDb == "" { mylogrus.MyLog.Fatalln("Db config is missing.") } // init redis rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: redis_section, }) if rdb == nil { mylogrus.MyLog.Fatalf("failed to connect redis %s\n", redisAddress) } result, err := rdb.Ping(context.Background()).Result() if err != nil { mylogrus.MyLog.Fatal(err) } else if result != "PONG" { mylogrus.MyLog.Fatalf("Invalid ping response %s", result) } // init db options := "?charset=utf8mb4&parseTime=True&loc=Local&time_zone=" + url.QueryEscape("'+8:00'") fmt.Println("options = ", options) dsn := mysqlUsername + ":" + mysqlPassword + "@(" + mysqHost + ")/" + mysqlDb db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Info), }) if err != nil { mylogrus.MyLog.Fatal("mysql connect error %v", err) } else { mylogrus.MyLog.Infof("mysql connect success") } var jwtConfig = HiloConfig{} db.First(&jwtConfig, "name = 'jwt_secret'") if len(jwtConfig.Value) == 0 { mylogrus.MyLog.Fatalln("Empty jwt secret") } mylogrus.MyLog.Infof("jwt secret is %s", jwtConfig.Value) common.SetJWTSecret(jwtConfig.Value) userManager = &manager.UserManager{ Ctx: context.Background(), RedisClient: rdb, } termManager = &manager.TerminalManager{ Ctx: context.Background(), RedisClient: rdb, } roomManager = &manager.RoomManager{ Ctx: context.Background(), RedisClient: rdb, } mylogrus.MyLog.Infof("Connected to Redis %s\n", redisAddress) go func() { ticker := time.NewTicker(time.Second * 30) defer ticker.Stop() for { select { case a := <-ticker.C: mylogrus.MyLog.Infof("Tick at %s", a.String()) terminals := termManager.GetAll() if terminals != nil { if len(*terminals) <= 100 { mylogrus.MyLog.Infof("%v", *terminals) } if len(*terminals) >= 2 { mylogrus.MyLog.Infof("%d on-line users found", len(*terminals)) } } } } }() // 初始化协程chan kickChan = make(chan KickChanMsg, kickChanSize) broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize) go check() // 检查长度 for i := 0; i < kickChanSize; i++ { go func(n int) { kick(n) }(i) } for i := 0; i < broadcastChanSize; i++ { go func(n int) { broadcast(n) }(i) } fmt.Println("Go RPC listening on ", port) lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) if err != nil { mylogrus.MyLog.Fatalf("failed to listen: %v", err) } s := grpc.NewServer(grpc.KeepaliveParams(kasp)) userCenter.RegisterUserServer(s, &server{}) if err := s.Serve(lis); err != nil { mylogrus.MyLog.Fatalf("failed to serve: %v", err) } } func kick(n int) { for msg := range kickChan { mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg) clientAddr := termManager.GetTerminal(msg.userId) if clientAddr == nil { mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId) } else { client := manager.UserProxyMgr.GetClient(msg.proxyAddr) if client == nil { mylogrus.MyLog.Errorf("No userProxy found for %d, %s\n", msg.userId, msg.proxyAddr) } else { toRouterClient := userCenter.NewRouterClient(client) in := &userCenter.KickMessage{Uid: msg.userId, Addr: *clientAddr} if err := sendKickMessage(toRouterClient, in); err != nil { mylogrus.MyLog.Errorf("sendKickMessage fail,uid:%v,proxyAddr:%v,clientAddr:%v,err:%v", msg.userId, msg.proxyAddr, clientAddr, err) } } } } } func broadcast(n int) { for msg := range broadcastChan { mylogrus.MyLog.Infof("handling broadcast in:%d,msg:%+v", n, msg) realBroadcast(msg.ProxyAddr, msg.UserIds, msg.in) // fixme: 这里还有优化空间,广播能否在proxy层做批量 } } var lastDingTime time.Time var dingIntervalMin float64 = 5 // 5min 告警间隔 func check() { lastDingTime = time.Now() tick := time.NewTicker(time.Second * 3) defer tick.Stop() for { select { case <-tick.C: l, l2 := len(kickChan), len(broadcastChan) if l > monitorLength || l2 > monitorLength { if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin { go func() { if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d", l, l2), true); sErr != nil { mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) } else { lastDingTime = time.Now() } }() } } if l > 0 || l2 > 0 { mylogrus.MyLog.Infof("userCenter msg,left kick:%v,broadcast:%v", l, l2) } } } }