package group_m import ( "context" "encoding/json" "git.hilo.cn/hilo-common/domain" "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/config" "git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/redisCli" "git.hilo.cn/hilo-common/sdk/agora" redisV8 "github.com/go-redis/redis/v8" "gorm.io/gorm" "hilo-group/_const/enum/group_e" "hilo-group/_const/redis_key" "hilo-group/_const/redis_key/mic_k" "hilo-group/myerr" "strconv" "strings" "time" ) //获取群组的最大消息序列 func getRediGroupMsgMaxSeqOrInit(groupUuid string) (*RediGroupMsgSeqMax, error) { //Map结构 nStr, err := redisCli.RedisClient.HGet(context.Background(), redis_key.GetPrefixGroupMsgSeqMaxAll(), groupUuid).Result() if err != nil { if err == redisV8.Nil { return &RediGroupMsgSeqMax{ Seq: 0, GroupUuid: groupUuid, }, nil } else { return nil, err } } if nStr == "" { return &RediGroupMsgSeqMax{ Seq: 0, GroupUuid: groupUuid, }, nil } seq, err := strconv.ParseUint(nStr, 0, 64) if err != nil { return nil, myerr.WrapErr(err) } return &RediGroupMsgSeqMax{ Seq: seq, GroupUuid: groupUuid, }, nil } //获取群组中某个用户的最大消息序列 func GetRedisGroupUserMsgMaxSeqOrInit(groupUuid string, externalId string) (*RedisGroupMsgSeqMaxUser, error) { //Map结构 nStr, err := redisCli.RedisClient.HGet(context.Background(), redis_key.GetPrefixGroupMsgSeqMaxGroup(groupUuid), externalId).Result() if err != nil { if err == redisV8.Nil { return &RedisGroupMsgSeqMaxUser{ GroupUuid: groupUuid, ExternalId: externalId, Seq: 0, }, nil } else { return nil, err } } seq, err := strconv.ParseUint(nStr, 0, 64) if err != nil { return nil, myerr.WrapErr(err) } return &RedisGroupMsgSeqMaxUser{ GroupUuid: groupUuid, ExternalId: externalId, Seq: seq, }, nil } //获取群与人,如果不存在,则初始化 func GetGroupUserOrInit(model *domain.Model, groupId mysql.Str, userId mysql.ID) (*GroupUser, error) { var groupUser GroupUser if err := model.Db.Where(&GroupUser{ GroupId: groupId, UserId: userId, }).First(&groupUser).Error; err != nil { if err == gorm.ErrRecordNotFound { return &GroupUser{ Model: model, GroupId: groupId, UserId: userId, MsgStatus: group_e.NormalMsgStatusGroupUser, }, nil } else { return nil, myerr.WrapErr(err) } } groupUser.Model = model return &groupUser, nil } //以groupInfoId 为准,如果数据不存在,则是默认值,因此返回值的size 是同groupInfoIdds size 一致 func GetGroupUserMap(model *domain.Model, groupIds []mysql.Str, userId mysql.ID) ([]GroupUser, error) { if len(groupIds) == 0 { return []GroupUser{}, nil } groupUsers := []GroupUser{} if err := model.Db.Where(&GroupUser{ UserId: userId, }).Where("group_id IN (?)", groupIds).Find(&groupUsers).Error; err != nil { return nil, myerr.WrapErr(err) } groupUserMap := map[mysql.Str]*GroupUser{} for i := 0; i < len(groupUsers); i++ { groupUserMap[groupUsers[i].GroupId] = &groupUsers[i] } r := []GroupUser{} //顺序排列 for i := 0; i < len(groupIds); i++ { if g, ok := groupUserMap[groupIds[i]]; ok { r = append(r, *g) } else { r = append(r, GroupUser{ GroupId: groupIds[i], UserId: userId, MsgStatus: group_e.NormalMsgStatusGroupUser, }) } } return r, nil } //获取过期用户消息 func getExpireRedisGroupUserMsgDuration() ([]*RedisGroupMsgDurationUser, error) { //sorted set //println(time.Now().Unix()) score := time.Now().Unix() - int64(config.GetGroupImConfig().MSG_SORT_EXPIRE) //redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis.GetPrefixGroupMsgDurationUser(), "", strconv.FormatInt(score, 10)).Result() zList, err := redisCli.RedisClient.ZRevRangeByScoreWithScores(context.Background(), redis_key.GetPrefixGroupMsgDurationUser(), &redisV8.ZRangeBy{ Min: "0", Max: strconv.FormatInt(score, 10), }).Result() if err != nil { return nil, myerr.WrapErr(err) } // redisGroupUserMsgDurations := []*RedisGroupMsgDurationUser{} for _, v := range zList { groupUuidUserId := v.Member //移除 n, err := redisCli.GetRedis().ZRem(context.Background(), redis_key.GetPrefixGroupMsgDurationUser(), groupUuidUserId).Result() if err == nil && n > 0 { // strs := strings.Split(groupUuidUserId.(string), "_") // if err != nil { return nil, myerr.WrapErr(err) } redisGroupUserMsgDurations = append(redisGroupUserMsgDurations, &RedisGroupMsgDurationUser{ GroupUuid: strs[0], ExternalId: strs[1], }) } } return redisGroupUserMsgDurations, nil } //获取过期用户消息 func initRedisGroupUserMsgDuration(groupUuid string, externalId string) *RedisGroupMsgDurationUser { return &RedisGroupMsgDurationUser{ GroupUuid: groupUuid, ExternalId: externalId, TimeStamp: time.Now().Unix(), } } func getRedisGroupMsgDurationScoreInit(groupUuid string) (*RedisGroupMsgDurationScore, error) { return &RedisGroupMsgDurationScore{ GroupUuid: groupUuid, AddCountUser: 0, }, nil } //获取群组消息的用户数量 //func getRedisGroupMsgDurationScoreOrInit(groupUuid string) (*RedisGroupMsgDurationScore, error) { // score, err := redisCli.RedisClient.ZScore(context.Background(), redis.GetPrefixGroupMsgDurationScore(), groupUuid).Result() // if err != nil { // //不存在 // if err == redisV8.Nil { // return &RedisGroupMsgDurationScore{ // GroupUuid: groupUuid, // AddCountUser: 0, // }, nil // } else { // return nil, myerr.WrapErr(err) // } // } // return &RedisGroupMsgDurationScore{ // GroupUuid: groupUuid, // AddCountUser: int(score), // }, nil //} //直接初始化 func InitGroupMsg(model *domain.Model, callbackCommand string, groupId string, t string, fromAccount string, operatorAccount string, random int, msgSeq uint64, msgTime uint64, onlineOnlyFlag uint8, msgBody string) *GroupMsg { return &GroupMsg{ Model: model, CallbackCommand: callbackCommand, GroupId: groupId, Type: t, FromAccount: fromAccount, OperatorAccount: operatorAccount, Random: random, MsgSeq: msgSeq, MsgTime: msgTime, MsgBody: msgBody, } } //获取麦上的人 func GetAllMicUser(groupUuid string) ([]mysql.ID, error) { var userIds []mysql.ID for i := 1; i <= MaxMicNum; i++ { micUserStr, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMicUser(groupUuid, i)).Result() if err != nil { if err != redisV8.Nil { return nil, myerr.WrapErr(err) } } else { var micUser MicUser if err = json.Unmarshal([]byte(micUserStr), &micUser); err != nil { return nil, myerr.WrapErr(err) } userIds = append(userIds, micUser.UserId) } } return userIds, nil } // 批量获取多个房间里麦上的人 func BatchGetAllMicUser(model *domain.Model, groupIds []string) (map[string][]mysql.ID, error) { result := make(map[string][]mysql.ID, 0) if len(groupIds) <= 0 { return result, nil } // 获取群组中上麦用户 for _, groupId := range groupIds { groupMicUserKey := mic_k.GetGroupOnMicUser(groupId) userIds, err := model.Redis.HVals(model, groupMicUserKey).Result() if err != nil { model.Log.Errorf("BatchGetAllMicUser fail:%v", err) } for _, userIdStr := range userIds { if userId, _ := strconv.ParseUint(userIdStr, 10, 64); userId > 0 { result[groupId] = append(result[groupId], userId) } } } return result, nil keys := make([]string, 0) for _, g := range groupIds { for i := 1; i <= MaxMicNum; i++ { keys = append(keys, redis_key.GetPrefixGroupMicUser(g, i)) } } mics, err := redisCli.GetRedis().MGet(context.Background(), keys...).Result() if err != nil { return nil, err } //model.Log.Infof("BatchGetAllMicUser redis return size = %d, mics: %v", len(mics), mics) if len(mics) >= len(groupIds) { for i, g := range groupIds { result[g] = make([]mysql.ID, 0) for j := 1; j <= MaxMicNum; j++ { k := i*MaxMicNum + j - 1 if mics[k] != nil { switch mics[k].(type) { case string: s := mics[k].(string) var micUser MicUser if err = json.Unmarshal([]byte(s), &micUser); err != nil { return nil, myerr.WrapErr(err) } model.Log.Debugf("BatchGetAllMicUser %s-%d return %v", g, j, micUser) result[g] = append(result[g], micUser.UserId) default: model.Log.Warnf("BatchGetAllMicUser %s-%d return unknown type", g, j) } } } } } return result, nil } func GetAllMic(groupUuid string, numType group_e.GroupMicNumType) ([]Mic, []MicUser, error) { /* n := 0 if numType == group_m.OneMicNumType { n = 1 }else if numType == group_m.TwoMicNumType { n = 2 }*/ n := GetMicNum(numType) micUserMap := map[int]string{} for i := 1; i <= n; i++ { //fixme:使用redis mget来替代。 micUser, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMicUser(groupUuid, i)).Result() if err != nil { if err != redisV8.Nil { return nil, nil, myerr.WrapErr(err) } } else { micUserMap[i] = micUser } } //fixme:这里检查是否有重复的,最后的防线 micMap, err := redisCli.GetRedis().HGetAll(context.Background(), redis_key.GetPrefixGroupMic(groupUuid)).Result() if err != nil { if err == redisV8.Nil { micMap = map[string]string{} } else { return nil, nil, myerr.WrapErr(err) } } //fixme:这里检查是否有重复的,最后的防线 var mics []Mic //来个默认值 for i := 1; i <= n; i++ { mics = append(mics, Mic{ GroupUuid: groupUuid, I: i, Lock: false, MicForbid: false, }) } var micUsers []MicUser // for k, v := range micMap { var mic Mic err = json.Unmarshal([]byte(v), &mic) if err != nil { return nil, nil, myerr.WrapErr(err) } i, err := strconv.Atoi(k) if err != nil { return nil, nil, myerr.WrapErr(err) } //麦位是不可能大于5个麦位, 可能存在给部分麦位信息。避免越界 if i <= n { mics[i-1].Lock = mic.Lock mics[i-1].MicForbid = mic.MicForbid } } //进行检测,但是没有删除,用户日志判断是否重复 logCheckUserMap := map[string]struct{}{} for i, _ := range micUsers { if _, flag := logCheckUserMap[micUsers[i].ExternalId]; flag { mylogrus.MyLog.Infof("mic repeat groupUid:%v, i:%v, externalId:%v", groupUuid, micUsers[i].I, micUsers[i].ExternalId) } else { logCheckUserMap[micUsers[i].ExternalId] = struct{}{} } } // for k, v := range micUserMap { var micUser MicUser err = json.Unmarshal([]byte(v), &micUser) if err != nil { return nil, nil, err } if err != nil { return nil, nil, myerr.WrapErr(err) } micUser.I = k micUser.GroupUuid = groupUuid micUsers = append(micUsers, micUser) } micUserCheckMap := map[string]struct{}{} for i, _ := range micUsers { if micUsers[i].ExternalId != "" { if _, flag := micUserCheckMap[micUsers[i].ExternalId]; flag { mylogrus.MyLog.Errorf("mic user repeat groupUid:%v, i:%v, ExternalId:%v", groupUuid, micUsers[i].I, micUsers[i].ExternalId) } else { micUserCheckMap[micUsers[i].ExternalId] = struct{}{} } } } return mics, micUsers, nil } //获取麦位 func GetMic(model *domain.Model, groupUuid string, i int) (*Mic, error) { if i < 1 || i > 30 { return nil, myerr.NewSysErrorF("麦序不对,不在范围值内 i:%v", i) } str, err := redisCli.GetRedis().HGet(context.Background(), redis_key.GetPrefixGroupMic(groupUuid), strconv.Itoa(i)).Result() if err != nil { if err == redisV8.Nil { return &Mic{ model: model, GroupUuid: groupUuid, I: i, Lock: false, MicForbid: false, }, nil } } var mic Mic err = json.Unmarshal([]byte(str), &mic) if err != nil { return nil, myerr.WrapErr(err) } return &Mic{ model: model, GroupUuid: groupUuid, I: i, Lock: mic.Lock, MicForbid: mic.MicForbid, }, nil } //获取用户在哪个麦位上。没有不在麦上,则是nil func GetMicUserByExternalId(model *domain.Model, externalId string) (*MicUser, error) { if str, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupUserInMic(externalId)).Result(); err != nil { if err != redisV8.Nil { return nil, myerr.WrapErr(err) } else { return nil, nil } } else { if userInMic, err := strToUserInMic(str); err != nil { return nil, err } else { return GetMicUser(model, userInMic.GroupUuid, userInMic.I) } } } //麦位上没人,返回nil func GetMicUser(model *domain.Model, groupUuid string, i int) (*MicUser, error) { if i < 1 || i > MaxMicNum { return nil, myerr.NewSysErrorF("麦序不对,不在范围值内 i:%v", i) } str, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMicUser(groupUuid, i)).Result() if err != nil { if err == redisV8.Nil { return nil, nil } else { return nil, myerr.WrapErr(err) } } var micUser MicUser err = json.Unmarshal([]byte(str), &micUser) if err != nil { return nil, err } return &MicUser{ model: model, GroupUuid: groupUuid, I: i, ExternalId: micUser.ExternalId, UserId: micUser.UserId, CpUserId: micUser.CpUserId, Forbid: micUser.Forbid, Timestamp: micUser.Timestamp, }, nil } func InitMicNumType(model *domain.Model, groupUuid string, t group_e.GroupMicNumType) *MicNumType { return &MicNumType{ model: model, GroupUuid: groupUuid, T: t, } } //func getMicUserRedisData(userId uint64, forbid bool) string { // return strconv.Itoa(int(userId)) + "_" + strconv.FormatBool(forbid) //} func micToStr(mic Mic) (string, error) { buf, err := json.Marshal(mic) if err != nil { return "", myerr.WrapErr(err) } else { return string(buf), nil } } func micUserToStr(micUser MicUser) (string, error) { buf, err := json.Marshal(micUser) if err != nil { return "", myerr.WrapErr(err) } else { return string(buf), nil } } func userInMicToStr(groupUuid string, i int, userId uint64) (string, error) { userInMic := UserInMic{ GroupUuid: groupUuid, I: i, UserId: userId, } buf, err := json.Marshal(userInMic) if err != nil { return "", myerr.WrapErr(err) } else { return string(buf), nil } } func strToUserInMic(str string) (UserInMic, error) { userInMic := UserInMic{} if err := json.Unmarshal([]byte(str), &userInMic); err != nil { return UserInMic{}, myerr.WrapErr(err) } return userInMic, nil } // 检查声网状态后再下奖 func (mu *MicUser) CheckAndDown() { gm, err := GetGroupInfo(mu.model, mu.GroupUuid) if err == nil && gm != nil { userListRsp, err := agora.GetUserList(gm.ChannelId) mu.model.Log.Infof("userListRsp %+v, err: %+v", userListRsp, err) found := false if err == nil { for _, u := range userListRsp.Data.Broadcasters { if mu.UserId == u { found = true break } } } if !found { // 确定不在主播列表里,才踢下麦 err = mu.LeaveByUser(mu.UserId, mu.ExternalId) mu.model.Log.Infof("LeaveByUser err:%v userId:%v", err, mu.UserId) // 下发通知给客户端弹出提示 if err == nil { MicSocketMicOutRPush(mu.model, mu.GroupUuid, mu.ExternalId) } } } }