...
 
Commits (5)
......@@ -10,6 +10,7 @@ import (
"hilo-algoCenter/common"
"hilo-algoCenter/common/config"
"hilo-algoCenter/common/mylogrus"
"hilo-algoCenter/common/mysql"
"hilo-algoCenter/cv"
"hilo-algoCenter/protocol"
"hilo-algoCenter/protocol/userCenter"
......@@ -43,7 +44,7 @@ type MatchResult struct {
ExcellentRelation float64
}
func matchSuccess(c userCenter.UserClient, uids []uint64, msgType uint32, data []byte) error {
func matchSuccess(c userCenter.UserClient, uids []uint64, msgType uint32, data []byte, msg *userProxy.MatchSuccess) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
rsp, err := c.Multicast(ctx, &userCenter.MulticastMessage{
......@@ -57,6 +58,11 @@ func matchSuccess(c userCenter.UserClient, uids []uint64, msgType uint32, data [
if rsp != nil && len(rsp.FailedUids) > 0 {
err = fmt.Errorf("%d users failed", len(rsp.FailedUids))
}
//记录socket,注意闭包问题
go func(uids []uint64, msg *userProxy.MatchSuccess, rspUids []uint64, err error) {
buf, _ := json.Marshal(msg)
AddRpcLogs(common.MsgTypeMatchV2Success, uids, string(buf[:]), rspUids, err)
}(uids, msg, rsp.FailedUids, err)
return err
}
......@@ -457,8 +463,7 @@ func processMatchFail(userClient userCenter.UserClient, val MatchResult, matchCy
}
if buffer, err := proto.Marshal(msg); err == nil {
rPcErr := ""
matchSuccess(userClient, []uint64{val.User1Id}, common.MsgTypeMatchSuccess, buffer) // todo for updating
if err = matchSuccess(userClient, []uint64{val.User1Id}, common.MsgTypeMatchV2Success, buffer); err == nil {
if err = matchSuccess(userClient, []uint64{val.User1Id}, common.MsgTypeMatchV2Success, buffer, msg); err == nil {
mylogrus.MyLog.Infof("match cycle:%v, match result sent msg success, single LocalUserId:%v, RemoteUserId %v\n", matchCycle, msg.LocalUserId, msg.RemoteUserId)
} else {
mylogrus.MyLog.Errorf("match cycle:%v, match result sent msg fail, single LocalUserId:%v, RemoteUserId %v\n", matchCycle, msg.LocalUserId, msg.RemoteUserId)
......@@ -500,9 +505,8 @@ func processMatchSuccess(userClient userCenter.UserClient, val MatchResult, matc
ok1 := false
ok2 := false
if buffer, err := proto.Marshal(msg); err == nil {
matchSuccess(userClient, uids, common.MsgTypeMatchSuccess, buffer) // todo for updating
if err = matchSuccess(userClient, uids, common.MsgTypeMatchV2Success, buffer); err == nil {
mylogrus.MyLog.Infof("match cycle:%v, match result sent msg begin LocalUserId:%v, RemoteUserId:%v, LocalUserId:%v, RemoteUserId:%v\n", matchCycle, msg.LocalUserId, msg.RemoteUserId, val.User1Id, val.User2Id)
if err = matchSuccess(userClient, uids, common.MsgTypeMatchV2Success, buffer, msg); err == nil {
mylogrus.MyLog.Infof("match cycle:%v, match result sent msg begin LocalUserId:%v, RemoteUserId:%v, LocalUserId:%v, RemoteUserId:%v,matchUid:%v", matchCycle, msg.LocalUserId, msg.RemoteUserId, val.User1Id, val.User2Id, msg.MatchUniqueId)
ok1 = true
// 交换数据
uids[0] = val.User2Id
......@@ -510,10 +514,9 @@ func processMatchSuccess(userClient userCenter.UserClient, val MatchResult, matc
msg.RemoteUserId = val.user1External
msg.RemoteUser = user1Info
if buffer, err = proto.Marshal(msg); err == nil {
matchSuccess(userClient, uids, common.MsgTypeMatchSuccess, buffer) // todo for updating
if err = matchSuccess(userClient, uids, common.MsgTypeMatchV2Success, buffer); err == nil {
if err = matchSuccess(userClient, uids, common.MsgTypeMatchV2Success, buffer, msg); err == nil {
ok2 = true
mylogrus.MyLog.Infof("match cycle:%v, match result sent msg success LocalUserId:%v, RemoteUserId:%v, LocalUserId:%v, RemoteUserId:%v\n", matchCycle, msg.LocalUserId, msg.RemoteUserId, val.User1Id, val.User2Id)
mylogrus.MyLog.Infof("match cycle:%v, match result sent msg success LocalUserId:%v, RemoteUserId:%v, LocalUserId:%v, RemoteUserId:%v,matchUid:%v", matchCycle, msg.LocalUserId, msg.RemoteUserId, val.User1Id, val.User2Id, msg.MatchUniqueId)
}
}
}
......@@ -775,3 +778,38 @@ func toSingleMatch(matchCycle *MatchCycle, hasMatch map[string]bool, tradeUnionM
}
return singleResults
}
type TypeRpc uint8
type RpcLog struct {
ID uint64 `gorm:"primary_key"`
Type TypeRpc
UserId string
Msg string
Err string
FailUids string
}
func (RpcLog) TableName() string {
month := time.Now().Format("200601")
return fmt.Sprintf("rpc_log_%s", month)
}
func AddRpcLogs(t TypeRpc, userIds []uint64, msg string, failUids []uint64, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
failUidStr, _ := json.Marshal(failUids)
userIdStr, _ := json.Marshal(userIds)
logRpc := RpcLog{
Type: t,
UserId: string(userIdStr[:]),
Msg: msg,
Err: errStr,
FailUids: string(failUidStr[:]),
}
if e := mysql.Db.Table(RpcLog{}.TableName()).Create(&logRpc).Error; e != nil {
mylogrus.MyLog.Errorf("log rpc save fail, err:%v", e)
}
}
......@@ -26,8 +26,10 @@ type MysqlCodeConfig struct {
//redis配置
type RedisConfig struct {
REDIS_HOST string
REDIS_PASSWORD string
REDIS_HOST string
REDIS_PASSWORD string
REDIS_CLUSTER_HOST string
REDIS_CLUSTER_PASSWORD string
}
//jwt
......
......@@ -69,40 +69,3 @@ func init() {
Db.LogMode(true)
Db.SetLogger(log.New(os.Stdout, "\r\n", 0))*/
}
/*func updateTimeStampForUpdateCallback(scope *gorm.Scope) {
if _, ok := scope.Get("gorm:update_column"); !ok {
_ = scope.SetColumn("UpdatedTime", time.Now().Unix())
}
}
func updateTimeStampForCreateCallback(scope *gorm.Scope) {
if !scope.HasError() {
nowTime := time.Now().Unix()
if createTimeField, ok := scope.FieldByName("CreatedTime"); ok {
if createTimeField.IsBlank {
_ = createTimeField.Set(nowTime)
}
}
if modifyTimeField, ok := scope.FieldByName("UpdatedTime"); ok {
if modifyTimeField.IsBlank {
_ = modifyTimeField.Set(nowTime)
}
}
}
}*/
func HasTable(tableName string) bool {
//var num int
//err := Db.Exec("SELECT COUNT(*) num FROM information_schema.TABLES WHERE table_name =%s;", tableName).Pluck("num", &num).Error
//if err != nil {
// mylogrus.MyLog.Errorf("HasTable err: %v, stack: %v", err, string(debug.Stack()))
// return
//}
//if num > 0 {
// has = true
//}
return Db.Migrator().HasTable(tableName)
}
[DATABASE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=47.244.34.27:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_HOST=172.19.0.2:6379
REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=172.19.0.2:6379
REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT]
SECRET=hilo1632
ISSUER_API=hiloApi
......
[DATABASE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=47.244.34.27:3306
MYSQL_USERNAME=root
MYSQL_PASSWORD=yX0jPAhO0I4s2zlA
MYSQL_HOST=hk-cynosdbmysql-grp-a3wqck8p.sql.tencentcdb.com:22303
MYSQL_USERNAME=hilo_test
MYSQL_PASSWORD=cPsTMSA9szQ6B9Y2zFXSvpDdduB8kZxC
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=47.244.34.27:6379
REDIS_PASSWORD=8QZ9JD1zLvPR3yHf
REDIS_HOST=172.19.0.2:6379
REDIS_PASSWORD=yPyZH1DYMJhrVQgr
REDIS_CLUSTER_HOST=172.19.0.2:6379
REDIS_CLUSTER_PASSWORD=yPyZH1DYMJhrVQgr
[JWT]
SECRET=hilo1632
ISSUER_API=hiloApi
......
......@@ -4,13 +4,12 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"hilo-algoCenter/algo"
"hilo-algoCenter/common/consul"
"hilo-algoCenter/common/config"
"hilo-algoCenter/common/mylogrus"
"hilo-algoCenter/protocol/userCenter"
"time"
......@@ -29,36 +28,12 @@ var kacp = keepalive.ClientParameters{
}
func main() {
client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数
if err != nil {
mylogrus.MyLog.Fatalln(err)
}
if client == nil {
mylogrus.MyLog.Fatalln("Fail to get consul client.")
}
redisAddress := default_redis_address
redisPassword := default_redis_password
kv := client.KV()
if kv != nil {
p, _, err := kv.Get("redis_address", nil)
if err == nil && p != nil {
redisAddress = string(p.Value)
}
p, _, err = kv.Get("redis_password", nil)
if err == nil && p != nil {
redisPassword = string(p.Value)
}
}
// init redis
rdb := redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
Addr: config.GetConfigRedis().REDIS_HOST,
Password: config.GetConfigRedis().REDIS_PASSWORD,
DB: redis_section,
})
if rdb == nil {
mylogrus.MyLog.Fatalf("failed to connect redis %s\n", redisAddress)
}
result, err := rdb.Ping(context.Background()).Result()
if err != nil {
......@@ -67,20 +42,27 @@ func main() {
mylogrus.MyLog.Fatalf("Invalid ping response %s", result)
}
cataLog := client.Catalog()
if cataLog == nil {
mylogrus.MyLog.Fatalln("No catalog.")
}
addr, err := consul.GetServices(cataLog, "userCenter")
// init redis cluster
rdbCluster := redis.NewClient(&redis.Options{
Addr: config.GetConfigRedis().REDIS_CLUSTER_HOST,
Password: config.GetConfigRedis().REDIS_CLUSTER_PASSWORD,
})
redisKey := fmt.Sprintf("service:userCenter")
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
mylogrus.MyLog.Fatalln(err)
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err)
mylogrus.MyLog.Errorf(failMsg)
} else if len(ipPorts) > 0 {
}
if len(addr) == 0 {
mylogrus.MyLog.Fatalln("No userCenter available.")
if len(ipPorts) <= 0 {
ipPorts = []string{"127.0.0.1:50040"}
}
addresses := make([]resolver.Address, len(addr))
for i, s := range addr {
addresses := make([]resolver.Address, len(ipPorts))
for i, s := range ipPorts {
addresses[i].Addr = s
mylogrus.MyLog.Infof("address : %s", s)
}
......
[DATABASE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com
MYSQL_USERNAME=nextvideo
MYSQL_PASSWORD=ihlUwI4nhi9W88MI
MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo
[DATABASECODE]
MYSQL_HOST=ua4papc3hmgqf351pbej-rw4rm.rwlb.dubai.rds.aliyuncs.com
MYSQL_USERNAME=nextvideo
MYSQL_PASSWORD=ihlUwI4nhi9W88MI
MYSQL_HOST=172.28.16.44
MYSQL_USERNAME=hilo_master
MYSQL_PASSWORD=o8NNd8F7e6On2RqIgOhsy1PsiSxROT3n
MYSQL_DB=hilo_code
[REDIS]
REDIS_HOST=r-eb3btxn8vfdsuwdbuf.redis.dubai.rds.aliyuncs.com:6379
REDIS_HOST=172.28.16.31:6379
REDIS_PASSWORD=
REDIS_CLUSTER_HOST=172.28.16.47:6379
REDIS_CLUSTER_PASSWORD=
[JWT]
SECRET=hilo1504
ISSUER_API=hiloApi
......