...
 
Commits (13)
......@@ -17,6 +17,10 @@ func AppIsRelease() bool {
return GetMode() == RELEASE
}
func AppIsLocal() bool {
return GetMode() == LOCAL
}
//数据库的配置
type MysqlConfig struct {
MYSQL_HOST string
......
......@@ -24,7 +24,7 @@ func init() {
filenamePrefix = logDir + filepath.Base(os.Args[0]) + "."
// stderr日志重定向
MyLog.SetOutput(os.Stdout)
RewriteStderrFile()
//RewriteStderrFile()
if config.AppIsRelease() {
MyLog.SetFormatter(&logrus.JSONFormatter{
......
[DATABASE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=47.244.34.27:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_CLUSTER_HOST=47.244.34.27:6379
REDIS_CLUSTER_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_HOST=172.19.0.2:6379
REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=172.19.0.2:6379
REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT]
SECRET=hilo1632
ISSUER_API=hiloApi
......
[DATABASE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=47.244.34.27:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_CLUSTER_HOST=47.244.34.27:6379
REDIS_CLUSTER_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_HOST=43.135.4.137:6379
REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=43.135.4.137:6379
REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT]
SECRET=hilo1632
ISSUER_API=hiloApi
......
......@@ -2,18 +2,17 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"github.com/golang/protobuf/proto"
"gorm.io/gorm/schema"
"net"
"net/http"
"net/url"
"os"
"strconv"
"time"
"github.com/go-redis/redis/v8"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"gorm.io/driver/mysql"
......@@ -92,7 +91,7 @@ var kasp = keepalive.ServerParameters{
}
func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) {
mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
//mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
var loginStatus uint32 = common.Login_success
claim, err := common.ParseToken(in.GetToken())
......@@ -109,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
// FIXME: 发现用户已经登录,要踢走旧连接
proxyAddr := userManager.GetUser(claim.UserId)
if proxyAddr != nil {
mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
//mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
kickChan <- KickChanMsg{
userId: claim.UserId,
proxyAddr: *proxyAddr,
......@@ -130,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
} else {
mylogrus.MyLog.Errorf("wrong user %d", claim.UserId)
}
mylogrus.MyLog.Infof("adding user %d", claim.UserId)
//mylogrus.MyLog.Infof("adding user %d", claim.UserId)
// save to redis
userManager.AddUser(claim.UserId, in.ProxyAddr)
......@@ -193,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage)
failed = append(failed, uid)
}
}
if len(failed) > 0 {
mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
}
//if len(failed) > 0 {
//mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
//}
return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil
}
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
//mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad))
failed := []uint64{}
......@@ -252,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage)
return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil
}
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
redisKey := "service:userSocket"
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, err
}
if len(ipPorts) <= 0 {
failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, errors.New(failMsg)
}
data, _ := proto.Marshal(in)
for _, ip := range ipPorts {
queue := "broadcast:" + ip
rdbCluster.RPush(context.Background(), queue, data)
}
return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil
}
func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) {
var failed []uint64
terminals := termManager.GetAll()
......@@ -312,11 +337,18 @@ func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*u
uids = append(uids, uid)
}
}
// 处理等级用户
levelUserIds, userIds := userManager.GetLevelUsers(uids, in.Level)
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
if in.Area > 0 {
// 处理分区用户
levelUserIds := userManager.GetLevelUsers(uids, in.Level)
levelUserIds = userManager.GetAreaUsers(userIds, int8(in.Area))
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
}
m := make(map[string][]uint64, 0)
for uid := range levelUserIds {
ok := false
......@@ -492,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint
}
func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error {
mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
//mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := c.KickUser(ctx, msg)
......@@ -502,84 +534,50 @@ func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) err
return err
}
func consulCheck(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "consulCheck")
}
const (
RegisterName = "userCenter"
RegisterTag = "用户中心"
)
func registerToConsul(client *consulapi.Client, retry bool) {
checkPort := port + 1000
registration := new(consulapi.AgentServiceRegistration)
hostName, _ := os.Hostname()
registration.ID = fmt.Sprintf("%s-%s", RegisterName, hostName)
registration.Name = RegisterName
registration.Port = port
registration.Tags = []string{RegisterTag}
myIp, myNodeName := "", ""
if localIp, err := common.GetClientIpV2(); err != nil {
mylogrus.MyLog.Fatalln("local ip not found", err)
} else {
myIp = localIp
func RegisterToRedis(RedisClusterClient *redis.Client, port int, init bool) {
// 本地不注册
if appConfig.AppIsLocal() {
return
}
mylogrus.MyLog.Infof("My ip is %s, nodeName: %s\n", myIp, myNodeName)
registration.Address = myIp
registration.Check = &consulapi.AgentServiceCheck{
HTTP: fmt.Sprintf("http://localhost:%d%s", checkPort, "/check"),
Timeout: "3s",
Interval: "5s",
DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务
if RedisClusterClient == nil {
failMsg := fmt.Sprintf("RegisterToRedis fail,redisClusterNotInit,serviceName:%v", RegisterName)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
mylogrus.MyLog.Errorf(failMsg)
return
}
err := client.Agent().ServiceRegister(registration)
redisKey := "service:" + RegisterName
ip, err := common.GetClientIpV2()
if err != nil {
mylogrus.MyLog.Fatal("register server error : ", err)
}
if !retry {
http.HandleFunc("/check", consulCheck)
http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil)
}
}
// 自愈检查
// 启动后每一分钟检查一次
// 首次启动不执行
func selfCheck() {
ticker := time.NewTicker(time.Minute)
failMsg := fmt.Sprintf("RegisterToRedis fail,ip fail,err:%v,serviceName:%v", err, RegisterName)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return
}
ipPort := fmt.Sprintf("%s:%d", ip, port)
if err := RedisClusterClient.ZAdd(context.Background(), redisKey, &redis.Z{
Score: float64(time.Now().Unix()),
Member: ipPort,
}).Err(); err != nil {
failMsg := fmt.Sprintf("RegisterToRedis fail,redis fail,err:%v,serviceName:%v", err, RegisterName)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
}
// 初始化注册自我检查 selfCheck
if init {
go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数
if err != nil {
mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err)
break
}
if client == nil {
mylogrus.MyLog.Errorf("Fail to get consul client.")
break
}
cataLog := client.Catalog()
if cataLog == nil {
mylogrus.MyLog.Errorf("No catalog.")
break
}
services, _, err := cataLog.Service(RegisterName, "", nil)
if err != nil {
mylogrus.MyLog.Errorf("%v", err)
break
}
if len(services) == 0 {
mylogrus.MyLog.Errorf("%s not found.", RegisterName)
go registerToConsul(client, true) // 重新注册
} else {
mylogrus.MyLog.Infof("%s check success %v", RegisterName, services[0])
RegisterToRedis(RedisClusterClient, port, false) // 刷新注册
}
}
}()
}
}
......@@ -588,17 +586,18 @@ type HiloConfigs struct {
Value string
}
var rdbCluster *redis.Client
func main() {
flag.Parse()
config := consulapi.DefaultConfig()
client, err := consulapi.NewClient(config)
if err != nil {
mylogrus.MyLog.Fatal("consul client error : ", err)
}
go registerToConsul(client, false)
go selfCheck()
// init redis cluster
rdbCluster = redis.NewClient(&redis.Options{
Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST,
Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD,
})
// 注册到redis
RegisterToRedis(rdbCluster, port, true)
// init redis
rdb := redis.NewClient(&redis.Options{
......@@ -668,8 +667,8 @@ func main() {
for {
select {
case a := <-ticker.C:
mylogrus.MyLog.Infof("Tick at %s", a.String())
case <-ticker.C:
//mylogrus.MyLog.Infof("Tick at %s", a.String())
terminals := termManager.GetAll()
if terminals != nil {
if len(*terminals) <= 100 {
......@@ -724,10 +723,10 @@ func main() {
func kick(n int) {
for msg := range kickChan {
mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
//mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
clientAddr := termManager.GetTerminal(msg.userId)
if clientAddr == nil {
mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
//mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
} else {
client := manager.UserProxyMgr.GetClient(msg.proxyAddr)
if client == nil {
......
......@@ -138,8 +138,9 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
// 获取财富等级大于某等级的用户
// 开区间
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uint64]UserTinyArea {
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) (map[uint64]UserTinyArea, []uint64) {
res := make(map[uint64]UserTinyArea)
var resIds []uint64
// 从db中读,暂时不缓存(几千个)
var users []UserTinyArea
if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id").
......@@ -148,7 +149,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin
Where("match_wealth_user_score.grade > ?", wealthLevel).
Find(&users).Error; err != nil {
mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err)
return res
return res, resIds
}
for _, u := range users {
a := m.GetArea(u.Country)
......@@ -161,6 +162,7 @@ func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) map[uin
Area: a,
Avatar: u.Avatar,
}
resIds = append(resIds, u.ID)
}
return res
return res, resIds
}
......@@ -136,6 +136,7 @@ message LevelMessage {
int32 level = 1;
uint32 msgType = 2;
bytes payLoad = 3;
int32 area = 4;
}
message LevelMessageRsp {
......
[DATABASE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com
MYSQL_USERNAME=nextvideo
MYSQL_PASSWORD=ihlUwI4nhi9W88MI
MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com
MYSQL_USERNAME=nextvideo
MYSQL_PASSWORD=ihlUwI4nhi9W88MI
MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=r-eb3btxn8vfdsuwdbuf.redis.dubai.rds.aliyuncs.com:6379
REDIS_HOST=172.28.16.31:6379
REDIS_PASSWORD=
REDIS_CLUSTER_HOST=r-eb3yt6k8zgxs62kjjs.redis.dubai.rds.aliyuncs.com:6379
REDIS_CLUSTER_HOST=172.28.16.47:6379
REDIS_CLUSTER_PASSWORD=
[JWT]
SECRET=hilo1504
......