...
 
Commits (15)
...@@ -17,6 +17,10 @@ func AppIsRelease() bool { ...@@ -17,6 +17,10 @@ func AppIsRelease() bool {
return GetMode() == RELEASE return GetMode() == RELEASE
} }
func AppIsLocal() bool {
return GetMode() == LOCAL
}
//数据库的配置 //数据库的配置
type MysqlConfig struct { type MysqlConfig struct {
MYSQL_HOST string MYSQL_HOST string
......
...@@ -24,7 +24,7 @@ func init() { ...@@ -24,7 +24,7 @@ func init() {
filenamePrefix = logDir + filepath.Base(os.Args[0]) + "." filenamePrefix = logDir + filepath.Base(os.Args[0]) + "."
// stderr日志重定向 // stderr日志重定向
MyLog.SetOutput(os.Stdout) MyLog.SetOutput(os.Stdout)
RewriteStderrFile() //RewriteStderrFile()
if config.AppIsRelease() { if config.AppIsRelease() {
MyLog.SetFormatter(&logrus.JSONFormatter{ MyLog.SetFormatter(&logrus.JSONFormatter{
......
[DATABASE] [DATABASE]
MYSQL_HOST=47.244.34.27:3306 MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=root MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo MYSQL_DB=hilo
[DATABASECODE] [DATABASECODE]
MYSQL_HOST=47.244.34.27:3306 MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=root MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code MYSQL_DB=hilo_code
[REDIS] [REDIS]
REDIS_HOST=47.244.34.27:6379 REDIS_HOST=172.19.0.2:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=47.244.34.27:6379 REDIS_CLUSTER_HOST=172.19.0.2:6379
REDIS_CLUSTER_PASSWORD=8QZ9JD1zLvPR3yHf REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT] [JWT]
SECRET=hilo1632 SECRET=hilo1632
ISSUER_API=hiloApi ISSUER_API=hiloApi
......
[DATABASE] [DATABASE]
MYSQL_HOST=47.244.34.27:3306 MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=root MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo MYSQL_DB=hilo
[DATABASECODE] [DATABASECODE]
MYSQL_HOST=47.244.34.27:3306 MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=root MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code MYSQL_DB=hilo_code
[REDIS] [REDIS]
REDIS_HOST=47.244.34.27:6379 REDIS_HOST=43.135.4.137:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=47.244.34.27:6379 REDIS_CLUSTER_HOST=43.135.4.137:6379
REDIS_CLUSTER_PASSWORD=8QZ9JD1zLvPR3yHf REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT] [JWT]
SECRET=hilo1632 SECRET=hilo1632
ISSUER_API=hiloApi ISSUER_API=hiloApi
......
...@@ -2,18 +2,17 @@ package main ...@@ -2,18 +2,17 @@ package main
import ( import (
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"gorm.io/gorm/schema" "gorm.io/gorm/schema"
"net" "net"
"net/http"
"net/url" "net/url"
"os"
"strconv" "strconv"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
...@@ -40,12 +39,14 @@ const ( ...@@ -40,12 +39,14 @@ const (
kickChanSize = 500 kickChanSize = 500
broadcastChanSize = 3500 broadcastChanSize = 3500
areacastChanSize = 3500 areacastChanSize = 3500
levelcastChanSize = 3500
) )
var ( var (
kickChan chan KickChanMsg kickChan chan KickChanMsg
broadcastChan chan BroadcastChanMsg broadcastChan chan BroadcastChanMsg
areacastChan chan AreaChanMsg areacastChan chan AreaChanMsg
levelcastChan chan LevelChanMsg
) )
type KickChanMsg struct { type KickChanMsg struct {
...@@ -65,6 +66,12 @@ type AreaChanMsg struct { ...@@ -65,6 +66,12 @@ type AreaChanMsg struct {
in *userCenter.AreaMessage in *userCenter.AreaMessage
} }
type LevelChanMsg struct {
ProxyAddr string
UserIds []uint64
in *userCenter.LevelMessage
}
var ( var (
userManager *manager.UserManager = nil userManager *manager.UserManager = nil
termManager *manager.TerminalManager = nil termManager *manager.TerminalManager = nil
...@@ -84,7 +91,7 @@ var kasp = keepalive.ServerParameters{ ...@@ -84,7 +91,7 @@ var kasp = keepalive.ServerParameters{
} }
func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) { func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userCenter.LoginMessageRsp, error) {
mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr) //mylogrus.MyLog.Infof("Received loginMsg: %s, from proxy %s, client %s\n", in.Token, in.ProxyAddr, in.ClientAddr)
var loginStatus uint32 = common.Login_success var loginStatus uint32 = common.Login_success
claim, err := common.ParseToken(in.GetToken()) claim, err := common.ParseToken(in.GetToken())
...@@ -101,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC ...@@ -101,7 +108,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
// FIXME: 发现用户已经登录,要踢走旧连接 // FIXME: 发现用户已经登录,要踢走旧连接
proxyAddr := userManager.GetUser(claim.UserId) proxyAddr := userManager.GetUser(claim.UserId)
if proxyAddr != nil { if proxyAddr != nil {
mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr) //mylogrus.MyLog.Infof("%d has existing value %s", claim.UserId, *proxyAddr)
kickChan <- KickChanMsg{ kickChan <- KickChanMsg{
userId: claim.UserId, userId: claim.UserId,
proxyAddr: *proxyAddr, proxyAddr: *proxyAddr,
...@@ -122,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC ...@@ -122,7 +129,7 @@ func (s *server) Login(ctx context.Context, in *userCenter.LoginMessage) (*userC
} else { } else {
mylogrus.MyLog.Errorf("wrong user %d", claim.UserId) mylogrus.MyLog.Errorf("wrong user %d", claim.UserId)
} }
mylogrus.MyLog.Infof("adding user %d", claim.UserId) //mylogrus.MyLog.Infof("adding user %d", claim.UserId)
// save to redis // save to redis
userManager.AddUser(claim.UserId, in.ProxyAddr) userManager.AddUser(claim.UserId, in.ProxyAddr)
...@@ -185,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage) ...@@ -185,13 +192,13 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage)
failed = append(failed, uid) failed = append(failed, uid)
} }
} }
if len(failed) > 0 { //if len(failed) > 0 {
mylogrus.MyLog.Infof("Multicast failed for %v\n", failed) //mylogrus.MyLog.Infof("Multicast failed for %v\n", failed)
} //}
return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
//mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad)) //mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad))
failed := []uint64{} failed := []uint64{}
...@@ -244,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) ...@@ -244,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage)
return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) {
redisKey := "service:userSocket"
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, err
}
if len(ipPorts) <= 0 {
failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err)
mylogrus.MyLog.Errorf(failMsg)
_ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
return nil, errors.New(failMsg)
}
data, _ := proto.Marshal(in)
for _, ip := range ipPorts {
queue := "broadcast:" + ip
rdbCluster.RPush(context.Background(), queue, data)
}
return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil
}
func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) { func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) {
var failed []uint64 var failed []uint64
terminals := termManager.GetAll() terminals := termManager.GetAll()
...@@ -294,6 +327,63 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use ...@@ -294,6 +327,63 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use
return &userCenter.AreaMessageRsp{FailedUids: failed}, nil return &userCenter.AreaMessageRsp{FailedUids: failed}, nil
} }
func (s *server) Levelcast(ctx context.Context, in *userCenter.LevelMessage) (*userCenter.LevelMessageRsp, error) {
var failed []uint64
terminals := termManager.GetAll()
if terminals != nil {
var uids []uint64
for uidStr := range *terminals {
if uid, _ := strconv.ParseUint(uidStr, 10, 64); uid > 0 {
uids = append(uids, uid)
}
}
// 处理等级用户
levelUserIds, userIds := userManager.GetLevelUsers(uids, in.Level)
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
if in.Area > 0 {
// 处理分区用户
levelUserIds = userManager.GetAreaUsers(userIds, int8(in.Area))
if len(levelUserIds) <= 0 {
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
}
m := make(map[string][]uint64, 0)
for uid := range levelUserIds {
ok := false
addr := userManager.GetUser(uid)
if addr != nil {
if _, ok := m[*addr]; !ok {
m[*addr] = make([]uint64, 0)
}
m[*addr] = append(m[*addr], uid)
ok = true
} else {
mylogrus.MyLog.Errorf("Unknown user %d\n", uid)
}
if !ok {
failed = append(failed, uid)
}
}
for addr, users := range m {
const sendBatchSize = 5
for i := 0; i < len(users); i += sendBatchSize {
end := i + sendBatchSize
if end > len(users) {
end = len(users)
}
levelcastChan <- LevelChanMsg{
ProxyAddr: addr,
UserIds: users[i:end],
in: in,
}
}
}
}
return &userCenter.LevelMessageRsp{FailedUids: failed}, nil
}
func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) { func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*userCenter.BizMessageRsp, error) {
mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad) mylogrus.MyLog.Infof("Transmiting msgType = %d, uid = %d, payLoad: %s\n", in.MsgType, in.Uid, in.PayLoad)
...@@ -384,6 +474,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) { ...@@ -384,6 +474,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) {
} }
} }
func realLevelcast(addr string, uids []uint64, msg *userCenter.LevelMessage) {
for _, uid := range uids {
client := manager.UserProxyMgr.MakeClient(addr)
if client == nil {
mylogrus.MyLog.Errorf("Failed in making client for %d, %s\n", uid, addr)
} else {
toRouterClient := userCenter.NewRouterClient(client)
status, err := routeMessage(toRouterClient, uid, msg.MsgType, msg.PayLoad)
if err != nil {
mylogrus.MyLog.Errorf("routeMessage uid = %d, msgType = %d, status = %d, %v", uid, msg.MsgType, status, err)
}
}
}
}
func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) { func routeMessage(c userCenter.RouterClient, uid uint64, msgType uint32, data []byte) (uint32, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
...@@ -418,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint ...@@ -418,7 +524,7 @@ func transmitMessage(c biz.TransmitterClient, msgType uint32, data string) (uint
} }
func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error { func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) error {
mylogrus.MyLog.Infof("sendKickMessage %s", msg.String()) //mylogrus.MyLog.Infof("sendKickMessage %s", msg.String())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
r, err := c.KickUser(ctx, msg) r, err := c.KickUser(ctx, msg)
...@@ -428,84 +534,50 @@ func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) err ...@@ -428,84 +534,50 @@ func sendKickMessage(c userCenter.RouterClient, msg *userCenter.KickMessage) err
return err return err
} }
func consulCheck(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "consulCheck")
}
const ( const (
RegisterName = "userCenter" RegisterName = "userCenter"
RegisterTag = "用户中心"
) )
func registerToConsul(client *consulapi.Client, retry bool) { func RegisterToRedis(RedisClusterClient *redis.Client, port int, init bool) {
checkPort := port + 1000 // 本地不注册
registration := new(consulapi.AgentServiceRegistration) if appConfig.AppIsLocal() {
hostName, _ := os.Hostname() return
registration.ID = fmt.Sprintf("%s-%s", RegisterName, hostName)
registration.Name = RegisterName
registration.Port = port
registration.Tags = []string{RegisterTag}
myIp, myNodeName := "", ""
if localIp, err := common.GetClientIpV2(); err != nil {
mylogrus.MyLog.Fatalln("local ip not found", err)
} else {
myIp = localIp
} }
mylogrus.MyLog.Infof("My ip is %s, nodeName: %s\n", myIp, myNodeName) if RedisClusterClient == nil {
failMsg := fmt.Sprintf("RegisterToRedis fail,redisClusterNotInit,serviceName:%v", RegisterName)
registration.Address = myIp _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
registration.Check = &consulapi.AgentServiceCheck{ mylogrus.MyLog.Errorf(failMsg)
HTTP: fmt.Sprintf("http://localhost:%d%s", checkPort, "/check"), return
Timeout: "3s",
Interval: "5s",
DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务
} }
err := client.Agent().ServiceRegister(registration) redisKey := "service:" + RegisterName
ip, err := common.GetClientIpV2()
if err != nil { if err != nil {
mylogrus.MyLog.Fatal("register server error : ", err) failMsg := fmt.Sprintf("RegisterToRedis fail,ip fail,err:%v,serviceName:%v", err, RegisterName)
} mylogrus.MyLog.Errorf(failMsg)
if !retry { _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
http.HandleFunc("/check", consulCheck) return
http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil) }
} ipPort := fmt.Sprintf("%s:%d", ip, port)
} if err := RedisClusterClient.ZAdd(context.Background(), redisKey, &redis.Z{
Score: float64(time.Now().Unix()),
// 自愈检查 Member: ipPort,
// 启动后每一分钟检查一次 }).Err(); err != nil {
// 首次启动不执行 failMsg := fmt.Sprintf("RegisterToRedis fail,redis fail,err:%v,serviceName:%v", err, RegisterName)
func selfCheck() { mylogrus.MyLog.Errorf(failMsg)
ticker := time.NewTicker(time.Minute) _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true)
defer ticker.Stop() }
for { // 初始化注册自我检查 selfCheck
select { if init {
case <-ticker.C: go func() {
client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数 ticker := time.NewTicker(time.Second * 5)
if err != nil { defer ticker.Stop()
mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err) for {
break select {
} case <-ticker.C:
if client == nil { RegisterToRedis(RedisClusterClient, port, false) // 刷新注册
mylogrus.MyLog.Errorf("Fail to get consul client.") }
break
}
cataLog := client.Catalog()
if cataLog == nil {
mylogrus.MyLog.Errorf("No catalog.")
break
}
services, _, err := cataLog.Service(RegisterName, "", nil)
if err != nil {
mylogrus.MyLog.Errorf("%v", err)
break
}
if len(services) == 0 {
mylogrus.MyLog.Errorf("%s not found.", RegisterName)
go registerToConsul(client, true) // 重新注册
} else {
mylogrus.MyLog.Infof("%s check success %v", RegisterName, services[0])
} }
} }()
} }
} }
...@@ -514,17 +586,18 @@ type HiloConfigs struct { ...@@ -514,17 +586,18 @@ type HiloConfigs struct {
Value string Value string
} }
var rdbCluster *redis.Client
func main() { func main() {
flag.Parse() flag.Parse()
config := consulapi.DefaultConfig() // init redis cluster
rdbCluster = redis.NewClient(&redis.Options{
client, err := consulapi.NewClient(config) Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST,
if err != nil { Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD,
mylogrus.MyLog.Fatal("consul client error : ", err) })
} // 注册到redis
go registerToConsul(client, false) RegisterToRedis(rdbCluster, port, true)
go selfCheck()
// init redis // init redis
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
...@@ -594,8 +667,8 @@ func main() { ...@@ -594,8 +667,8 @@ func main() {
for { for {
select { select {
case a := <-ticker.C: case <-ticker.C:
mylogrus.MyLog.Infof("Tick at %s", a.String()) //mylogrus.MyLog.Infof("Tick at %s", a.String())
terminals := termManager.GetAll() terminals := termManager.GetAll()
if terminals != nil { if terminals != nil {
if len(*terminals) <= 100 { if len(*terminals) <= 100 {
...@@ -613,6 +686,7 @@ func main() { ...@@ -613,6 +686,7 @@ func main() {
kickChan = make(chan KickChanMsg, kickChanSize) kickChan = make(chan KickChanMsg, kickChanSize)
broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize) broadcastChan = make(chan BroadcastChanMsg, broadcastChanSize)
areacastChan = make(chan AreaChanMsg, areacastChanSize) areacastChan = make(chan AreaChanMsg, areacastChanSize)
levelcastChan = make(chan LevelChanMsg, levelcastChanSize)
go check() // 检查长度 go check() // 检查长度
for i := 0; i < kickChanSize; i++ { for i := 0; i < kickChanSize; i++ {
go func(n int) { go func(n int) {
...@@ -629,6 +703,11 @@ func main() { ...@@ -629,6 +703,11 @@ func main() {
areacast(n) areacast(n)
}(i) }(i)
} }
for i := 0; i < levelcastChanSize; i++ {
go func(n int) {
levelcast(n)
}(i)
}
fmt.Println("Go RPC listening on ", port) fmt.Println("Go RPC listening on ", port)
lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) lis, err := net.Listen("tcp4", ":"+strconv.Itoa(port))
...@@ -644,10 +723,10 @@ func main() { ...@@ -644,10 +723,10 @@ func main() {
func kick(n int) { func kick(n int) {
for msg := range kickChan { for msg := range kickChan {
mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg) //mylogrus.MyLog.Infof("handling kick in:%d,msg:%+v", n, msg)
clientAddr := termManager.GetTerminal(msg.userId) clientAddr := termManager.GetTerminal(msg.userId)
if clientAddr == nil { if clientAddr == nil {
mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId) //mylogrus.MyLog.Errorf("No terminal found for %d", msg.userId)
} else { } else {
client := manager.UserProxyMgr.GetClient(msg.proxyAddr) client := manager.UserProxyMgr.GetClient(msg.proxyAddr)
if client == nil { if client == nil {
...@@ -677,6 +756,12 @@ func areacast(n int) { ...@@ -677,6 +756,12 @@ func areacast(n int) {
} }
} }
func levelcast(n int) {
for msg := range levelcastChan {
realLevelcast(msg.ProxyAddr, msg.UserIds, msg.in)
}
}
var lastDingTime time.Time var lastDingTime time.Time
var dingIntervalMin float64 = 5 // 5min 告警间隔 var dingIntervalMin float64 = 5 // 5min 告警间隔
...@@ -687,11 +772,12 @@ func check() { ...@@ -687,11 +772,12 @@ func check() {
for { for {
select { select {
case <-tick.C: case <-tick.C:
l, l2, l3 := len(kickChan), len(broadcastChan), len(areacastChan) l, l2, l3, l4 := len(kickChan), len(broadcastChan), len(areacastChan), len(levelcastChan)
if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength { if l >= monitorLength || l2 >= monitorLength || l3 >= monitorLength || l4 >= monitorLength {
if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin { if time.Now().Sub(lastDingTime).Minutes() > dingIntervalMin {
go func() { go func() {
if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d", l, l2, l3), true); sErr != nil { if sErr := dingding.SendDingRobot(dingding.ROBOTWEBHOOK, fmt.Sprintf("userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d",
l, l2, l3, l4), true); sErr != nil {
mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr) mylogrus.MyLog.Errorf("dingding msg fail:%v", sErr)
} else { } else {
lastDingTime = time.Now() lastDingTime = time.Now()
......
...@@ -135,3 +135,34 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT ...@@ -135,3 +135,34 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
} }
return res return res
} }
// 获取财富等级大于某等级的用户
// 开区间
func (m *UserManager) GetLevelUsers(userIds []uint64, wealthLevel int32) (map[uint64]UserTinyArea, []uint64) {
res := make(map[uint64]UserTinyArea)
var resIds []uint64
// 从db中读,暂时不缓存(几千个)
var users []UserTinyArea
if err := m.MysqlDB.Table("user").Joins("JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id").
Select("user.id,external_id,sex,code,country,avatar").
Where("user.id IN (?)", userIds).
Where("match_wealth_user_score.grade > ?", wealthLevel).
Find(&users).Error; err != nil {
mylogrus.MyLog.Errorf("GetLevelUsers fail:%v", err)
return res, resIds
}
for _, u := range users {
a := m.GetArea(u.Country)
res[u.ID] = UserTinyArea{
ID: u.ID,
ExternalId: u.ExternalId,
Sex: u.Sex,
Code: u.Code,
Country: u.Country,
Area: a,
Avatar: u.Avatar,
}
resIds = append(resIds, u.ID)
}
return res, resIds
}
...@@ -132,6 +132,17 @@ message AreaMessageRsp { ...@@ -132,6 +132,17 @@ message AreaMessageRsp {
repeated uint64 failedUids = 1; repeated uint64 failedUids = 1;
} }
message LevelMessage {
int32 level = 1;
uint32 msgType = 2;
bytes payLoad = 3;
int32 area = 4;
}
message LevelMessageRsp {
repeated uint64 failedUids = 1;
}
service Router { service Router {
rpc route(RouteMessage) returns (RouteMessageRsp) {} rpc route(RouteMessage) returns (RouteMessageRsp) {}
rpc kickUser(KickMessage) returns (KickMessageRsp) {} rpc kickUser(KickMessage) returns (KickMessageRsp) {}
...@@ -143,6 +154,7 @@ service User { ...@@ -143,6 +154,7 @@ service User {
rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {} rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {}
rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {} rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {}
rpc areacast(AreaMessage) returns (AreaMessageRsp) {} rpc areacast(AreaMessage) returns (AreaMessageRsp) {}
rpc levelcast(LevelMessage) returns (LevelMessageRsp) {}
rpc transmit(BizMessage) returns (BizMessageRsp) {} rpc transmit(BizMessage) returns (BizMessageRsp) {}
rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {} rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {}
rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {} rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {}
......
...@@ -86,7 +86,6 @@ message MatchConfirm { ...@@ -86,7 +86,6 @@ message MatchConfirm {
uint32 remoteAgoraId = 6; uint32 remoteAgoraId = 6;
uint32 callDuration = 7; uint32 callDuration = 7;
uint32 localAgoraId = 8; uint32 localAgoraId = 8;
uint32 diamondBalance = 9;
string matchUniqueId = 10; string matchUniqueId = 10;
uint32 failType = 11; uint32 failType = 11;
} }
...@@ -97,62 +96,6 @@ message CallReady { ...@@ -97,62 +96,6 @@ message CallReady {
uint64 endTimestamp = 2; uint64 endTimestamp = 2;
uint64 callDuration = 3; uint64 callDuration = 3;
string channelId = 4; string channelId = 4;
uint64 remainDiamond = 5;
}
/* id == 103 礼物加时 */
message AddTimeGift {
uint32 giftId = 1;
string token = 2;
uint32 duration = 3;
uint64 endTimestamp = 4;
string channelId = 5;
bool isSender = 6;
uint32 giftNum = 7;
string iconUrl = 8;
string svgaUrl = 9;
string senderAvatar = 10;
string receiverAvatar = 11;
}
/* id == 104 免费加时 */
message AddTimeFree {
string token = 1;
uint32 duration = 2;
uint64 endTimestamp = 3;
string channelId = 4;
uint32 senderAgoraId = 5;
}
/* id == 105 退出 */
message ConnectsQuit {
uint64 from_user_id = 1;
}
/* id == 106 连接状态 */
message ConnectStatus {
uint64 from_user_id = 1;
float user_diamonds = 2;
bool diamonds_enough = 3;
}
/* id == 107 ??? */
message ConnectsCall {
uint64 from_user_id = 1;
string rong_room_name = 2;
bool is_join = 3;
}
/* id == 108 */
message ConnectCommon {
string rong_room_name = 1;
uint64 from_user_id = 2;
string extra = 3;
string message = 4;
}
/* id == 109 召回授权弹框 */
message RecallWindow {
} }
/* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/ /* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/
...@@ -165,17 +108,19 @@ message Video { ...@@ -165,17 +108,19 @@ message Video {
string sendUserId = 6; string sendUserId = 6;
string receiveUserId = 7; string receiveUserId = 7;
uint32 status = 8; uint32 status = 8;
uint32 diamondBalance = 9;
User sendUser = 10; User sendUser = 10;
} }
/* id == 109 召回授权弹框 */
message RecallWindow {
}
/* id == 111 视频通话准备 */ /* id == 111 视频通话准备 */
message VideoCallReady { message VideoCallReady {
uint64 startTimestamp = 1; uint64 startTimestamp = 1;
uint64 endTimestamp = 2; uint64 endTimestamp = 2;
uint64 callDuration = 3; uint64 callDuration = 3;
string channelId = 4; string channelId = 4;
uint64 remainDiamond = 5;
} }
/* id == 112 互相喜欢 */ /* id == 112 互相喜欢 */
...@@ -213,6 +158,7 @@ message GlobalGiftBanner { ...@@ -213,6 +158,7 @@ message GlobalGiftBanner {
uint32 bannerType = 14; // 类型:0.普通礼物 1.cp直接送礼 2.cp告白礼物 uint32 bannerType = 14; // 类型:0.普通礼物 1.cp直接送礼 2.cp告白礼物
uint32 cpLevel = 15; // cp等级 uint32 cpLevel = 15; // cp等级
string receiveUserAvatar = 16; string receiveUserAvatar = 16;
uint32 nobleLevel = 17; // 贵族等级
} }
/* id == 116 横幅的回应,用来测量RTT */ /* id == 116 横幅的回应,用来测量RTT */
...@@ -289,6 +235,7 @@ message GlobalBroadcast { ...@@ -289,6 +235,7 @@ message GlobalBroadcast {
string msg = 6; string msg = 6;
string groupId = 7; string groupId = 7;
uint32 senderNobleLevel = 8; uint32 senderNobleLevel = 8;
bool isPinned = 9;
} }
/* id == 124 全球消息 */ /* id == 124 全球消息 */
...@@ -322,13 +269,11 @@ message VideoTimeMinuteSuccess { ...@@ -322,13 +269,11 @@ message VideoTimeMinuteSuccess {
uint32 senderAgoraId = 5; uint32 senderAgoraId = 5;
string videoUniqueId = 6; string videoUniqueId = 6;
bool isSend = 7; bool isSend = 7;
uint32 sendRemainDiamond = 8;
} }
/* id == 129 1对1视频1分钟加时询问检查 */ /* id == 129 1对1视频1分钟加时询问检查 */
message VideoTimeMinuteCheck { message VideoTimeMinuteCheck {
string videoUniqueId = 1; string videoUniqueId = 1;
uint32 diamond = 2;
string uuid = 3; string uuid = 3;
} }
...@@ -369,6 +314,7 @@ message GlobalGameBanner { ...@@ -369,6 +314,7 @@ message GlobalGameBanner {
uint64 diamond = 4; uint64 diamond = 4;
string bannerUrl = 5; string bannerUrl = 5;
uint64 gameId = 6; // 1.ludo 2.uno 3.dice 4.lucky wheel 5.lucky box 6.fruit 7.slot uint64 gameId = 6; // 1.ludo 2.uno 3.dice 4.lucky wheel 5.lucky box 6.fruit 7.slot
bool isPink = 7; // 是否粉钻
} }
/* id == 147 羊羊匹配成功 */ /* id == 147 羊羊匹配成功 */
...@@ -377,6 +323,11 @@ message SheepMatchSuccess { ...@@ -377,6 +323,11 @@ message SheepMatchSuccess {
User user = 2; User user = 2;
User otherUser = 3; User otherUser = 3;
uint64 game_id = 4; uint64 game_id = 4;
string channelId = 5;
string token = 6;
uint32 agoraId = 7;
uint32 provider = 8;
uint32 otherAgoraId = 9;
} }
message SheepGamePlayer { message SheepGamePlayer {
...@@ -463,4 +414,34 @@ message MicUserData { ...@@ -463,4 +414,34 @@ message MicUserData {
string micEffect = 14; string micEffect = 14;
string headwearIcon = 15; string headwearIcon = 15;
Svip svip = 16; Svip svip = 16;
}
/* id == 157 游戏大厅匹配成功 */
message LobbyMatchSuccess {
uint64 game_id = 1;
uint64 mode = 2;
string group_id = 3;
User user = 4;
User otherUser = 5;
string gameCode = 6;
}
/* id == 158 H5游戏静音 */
message H5GameVoiceMute {
}
/* id == 159 H5游戏打开语音 */
message H5GameVoiceUnMute {
}
/* id == 160 退出房间 */
message QuitRoom {
uint32 reason = 1; // 原因1.被拉黑;2.被踢出
string group_id = 2;
}
/* id == 161 国家管理员横幅 */
message GlobalCountryMgrBanner {
string country = 1; // 国家
User user = 2; // 用户信息
} }
\ No newline at end of file
[DATABASE] [DATABASE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=nextvideo MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=ihlUwI4nhi9W88MI MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo MYSQL_DB=hilo
[DATABASECODE] [DATABASECODE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=nextvideo MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=ihlUwI4nhi9W88MI MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo_code MYSQL_DB=hilo_code
[REDIS] [REDIS]
REDIS_HOST=r-eb3btxn8vfdsuwdbuf.redis.dubai.rds.aliyuncs.com:6379 REDIS_HOST=172.28.16.31:6379
REDIS_PASSWORD= REDIS_PASSWORD=
REDIS_CLUSTER_HOST=r-eb3yt6k8zgxs62kjjs.redis.dubai.rds.aliyuncs.com:6379 REDIS_CLUSTER_HOST=172.28.16.47:6379
REDIS_CLUSTER_PASSWORD= REDIS_CLUSTER_PASSWORD=
[JWT] [JWT]
SECRET=hilo1504 SECRET=hilo1504
......