package group_m import ( "context" "encoding/json" "git.hilo.cn/hilo-common/domain" "git.hilo.cn/hilo-common/mycontext" "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/redisCli" redis2 "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" "gorm.io/gorm" "gorm.io/gorm/clause" "hilo-group/_const/enum/group_e" "hilo-group/_const/redis_key" "hilo-group/domain/cache/room_c" "hilo-group/domain/model/user_m" "hilo-group/myerr" "sort" "strconv" "strings" "time" ) /** * redis记录。 sortSet记录房间中的人 (groupUid_userId),5分钟的http麦位信息轮询, 更新sortSet的分值(有效期),并且更新sortSet的过期时间 选择sortSet,而不是 key的原因: 1:扩展更多功能,在房间的人,房间内先入后到的排名(score的高低位) 2:通过sortSet的key的生命周期,一样能保证清理僵尸用户。 */ func getMemberStr(groupUid string, userId uint64) string { return groupUid + "_" + strconv.FormatUint(userId, 10) } func analysisMemberStr(memberStr string) (string, uint64) { strs := strings.Split(memberStr, "_") groupUid := strs[0] userId, err := strconv.ParseUint(strs[1], 10, 64) if err != nil { mylogrus.MyLog.Errorf("analysisMemberStr memberStr:%v err:%+v", memberStr, err) } return groupUid, userId } func RoomLivingExpire(model *domain.Model, groupUid string, userId uint64) { model.Log.Infof("room RoomLivingExpire userId:%v, groupUid:%v", userId, groupUid) // key := redis_key.GetPrefixGroupRoomLiving() i, err := redisCli.GetRedis().ZAdd(context.Background(), key, &redis2.Z{ Score: float64(time.Now().Unix()), Member: getMemberStr(groupUid, userId), }).Result() if err != nil { model.Log.Errorf("RoomLivingExpire ZAdd key:%v, groupUid:%v, userId:%v, err:%v", key, groupUid, userId, err) } else { model.Log.Infof("RoomLivingExpire ZAdd key:%v, groupUid:%v, userId:%v result:%v", key, groupUid, userId, i) } } //进入房间 func RoomLivingIn(model *domain.Model, groupUid string, userId uint64, externalId string, robot bool) error { model.Log.Infof("room RoomLivingIn userId:%v, groupUid:%v", userId, groupUid) //这个可以考虑用go协程 if !robot { RoomLivingLeave(model, userId, externalId, "") } // key := redis_key.GetPrefixGroupRoomLiving() i, err := redisCli.GetRedis().ZAdd(context.Background(), key, &redis2.Z{ Score: float64(time.Now().Unix()), Member: getMemberStr(groupUid, userId), }).Result() if err != nil { model.Log.Errorf("UpdateRoomLiving ZAdd key:%v, groupUid:%v, userId:%v, err:%v", key, groupUid, userId, err) } else { model.Log.Infof("UpdateRoomLiving ZAdd key:%v, groupUid:%v, userId:%v result:%v", key, groupUid, userId, i) } go func(myContext *mycontext.MyContext, groupId string) { roomOnlineUser, err := GetRoomOnlineUser(myContext, groupId) if err != nil { myContext.Log.Errorf("room RoomLivingIn roomOnlineUser err:%+v, groupUid:%v", err, groupId) //错误,跳过,不退出 return } buf, err := json.Marshal(roomOnlineUser) if err != nil { myContext.Log.Errorf("room RoomLivingIn roomOnlineUser json.Marshal err:%+v groupUid:%v", err, groupId) //错误,跳过,不退出 return } else { myContext.Log.Infof("room RoomLivingIn json.Marshal success groupUid:%v, data:%v", groupId, string(buf)) //发送在线 sendSignalMsg(model, groupId, GroupSystemMsg{ MsgId: group_e.GroupOnlineUser, Content: string(buf), }, true) } }(model.MyContext, groupUid) return err } //离开房间 func roomLivingLeave(model *domain.Model, userId uint64, groupId string) ([]string, error) { model.Log.Infof("RoomLivingLeave userId:%v", userId) key := redis_key.GetPrefixGroupRoomLiving() //if err := redisCli.ClearExpired(key, expireMinute); err != nil { if err := model.Redis.ZRemRangeByScore(model, key, "0", strconv.FormatInt(time.Now().Unix()-expireMinute, 10)).Err(); err != nil { return nil, myerr.WrapErr(err) } data, err := redisCli.GetRedis().ZRange(context.Background(), key, 0, -1).Result() if err != nil { return nil, myerr.WrapErr(err) } groupIds := make([]string, 0) for i, _ := range data { gid, uid := analysisMemberStr(data[i]) if uid == userId && (groupId == "" || gid == groupId) { if _, err := redisCli.GetRedis().ZRem(context.Background(), key, getMemberStr(gid, uid)).Result(); err != nil { model.Log.Errorf("RoomLivingLeave ZRem key:%s, groupId:%s, userId:%d, err:%v", key, gid, uid, err) return nil, myerr.WrapErr(err) } else { groupIds = append(groupIds, gid) model.Log.Infof("RoomLivingLeave ZRem success key:%s, groupId:%s, userId:%d", key, gid, uid) } // 发信令,让前端重新拉取,接受容错, user, err := user_m.GetUser(model, userId) if err != nil || user == nil { model.Log.Errorf("RoomLivingLeave GetUser err:%v", err) } else { sendSignalMsg(model, gid, GroupSystemMsg{ MsgId: group_e.GroupOutSignal, Source: user.ExternalId, }, false) } } } go func(myContext *mycontext.MyContext, groupIds []string) { for i, groupId := range groupIds { roomOnlineUser, err := GetRoomOnlineUser(myContext, groupIds[i]) if err != nil { myContext.Log.Errorf("RoomLivingLeave roomOnlineUser err:%+v, groupUid:%v", err, groupId) //错误,跳过,不退出 continue } buf, err := json.Marshal(roomOnlineUser) if err != nil { myContext.Log.Errorf("RoomLivingLeave roomOnlineUser json.Marshal err:%+v groupUid:%v", err, groupId) //错误,跳过,不退出 continue } else { myContext.Log.Infof("RoomLivingLeave json.Marshal success groupUid:%v, data:%v", groupId, string(buf)) //发送在线 sendSignalMsg(domain.CreateModelContext(myContext), groupId, GroupSystemMsg{ MsgId: group_e.GroupOnlineUser, Content: string(buf), }, true) } } }(model.MyContext, groupIds) return groupIds, nil } //离开房间 func RoomLivingLeave(model *domain.Model, userId uint64, externalId string, groupId string) ([]string, error) { model.Log.Infof("RoomLivingLeave userId:%v", userId) //获取用户是否在麦上, 让用户离开麦 micUser, err := GetMicUserByExternalId(model, externalId) if err != nil { return nil, err } if micUser != nil { if err = micUser.LeaveByUser(userId, externalId); err != nil { return nil, err } } return roomLivingLeave(model, userId, groupId) } //被踢出离开房间 func RoomLivingLeaveByKick(model *domain.Model, groupUuid string, beKickuserId uint64, beKickExternalId string, userExternalId string) ([]string, error) { model.Log.Infof("RoomLivingLeaveByKick userId:%v", beKickuserId) //获取用户是否在麦上, 让用户离开麦 micUser, err := GetMicUserByExternalId(model, beKickExternalId) if err != nil { return nil, err } if micUser != nil { if err = micUser.LeaveByUser(beKickuserId, beKickExternalId); err != nil { return nil, err } } // 发信令,让前端重新拉取,接受容错, /* sendSignalMsg(model, groupUuid, GroupSystemMsg{ MsgId: group_enum.GroupKickOut, Source: userExternalId, Target: beKickExternalId, }, false)*/ MicGroupKickOutRPush(model, groupUuid, userExternalId, beKickExternalId) return roomLivingLeave(model, beKickuserId, groupUuid) } // 直播群状态回调导致的下麦 func RoomLivingLeaveByOffline(model *domain.Model, userId uint64, externalId string, groupId string) ([]string, error) { //获取用户是否在麦上, 让用户离开麦 micUser, err := GetMicUserByExternalId(model, externalId) if err != nil { return nil, err } if micUser != nil { model.Log.Infof("RoomLivingLeaveByOffline user %d, %s, groupId:%s", userId, externalId, groupId) if err = micUser.LeaveByUser(userId, externalId); err != nil { return nil, err } MicSocketMicOutRPush(model, micUser.GroupUuid, micUser.ExternalId) } return roomLivingLeave(model, userId, groupId) } //socket掉线被提出 func RoomLivingLeaveByMgr(model *domain.Model, userId uint64, externalId string, groupId string) ([]string, error) { model.Log.Infof("RoomLivingLeaveByMgr userId:%v", userId) //获取用户是否在麦上, 让用户离开麦 micUser, err := GetMicUserByExternalId(model, externalId) if err != nil { return nil, err } if micUser != nil { if err = micUser.LeaveByUser(userId, externalId); err != nil { return nil, err } MicSocketMicOutRPush(model, micUser.GroupUuid, micUser.ExternalId) } groupIds, err := roomLivingLeave(model, userId, groupId) if err != nil { return groupIds, err } // 发送信令 type removeHistoryParam struct { RemoveHistory bool `json:"removeHistory"` } r := removeHistoryParam{RemoveHistory: true} buf, err := json.Marshal(r) if err == nil { systemMsg := GroupSystemMsg{MsgId: group_e.GroupMemberRemoveSignal, Target: externalId, Content: string(buf)} for i, _ := range groupIds { sendSignalMsg(model, groupIds[i], systemMsg, false) } } return groupIds, err } //获取在房间的用户(其中成员的位置按分数值递增(从大到小)来排序) func RoomLivingExistsUserId(groupUid string) ([]uint64, error) { key := redis_key.GetPrefixGroupRoomLiving() //if err := redisCli.ClearExpired(key, expireMinute); err != nil { var model = domain.CreateModelNil() if err := model.Redis.ZRemRangeByScore(model, key, "0", strconv.FormatInt(time.Now().Unix()-expireMinute, 10)).Err(); err != nil { return nil, myerr.WrapErr(err) } groupUserIdstrs, err := redisCli.GetRedis().ZRevRange(context.Background(), key, 0, -1).Result() if err != nil { return nil, myerr.WrapErr(err) } mylogrus.MyLog.Infof("group_room_living RoomLivingExistsUserId groupUserIdstrs:%v", groupUserIdstrs) userIds := make([]uint64, 0, len(groupUserIdstrs)) for i, _ := range groupUserIdstrs { tempGroupUid, userId := analysisMemberStr(groupUserIdstrs[i]) if tempGroupUid == groupUid { userIds = append(userIds, userId) } } return userIds, nil } //获取在房间的用户 返回值:map,key:userId, value:groupUuid func RoomLivingUserIdFilter(userIds []mysql.ID) (map[mysql.ID]string, error) { userIdSet := map[mysql.ID]struct{}{} for i, _ := range userIds { userIdSet[userIds[i]] = struct{}{} } key := redis_key.GetPrefixGroupRoomLiving() //if err := redisCli.ClearExpired(key, expireMinute); err != nil { model := domain.CreateModelNil() if err := model.Redis.ZRemRangeByScore(model, key, "0", strconv.FormatInt(time.Now().Unix()-expireMinute, 10)).Err(); err != nil { return nil, myerr.WrapErr(err) } groupUserIdstrs, err := redisCli.GetRedis().ZRange(context.Background(), key, 0, -1).Result() if err != nil { return nil, myerr.WrapErr(err) } resultUserSet := map[mysql.ID]string{} for i, _ := range groupUserIdstrs { tempGroupUid, userId := analysisMemberStr(groupUserIdstrs[i]) mylogrus.MyLog.Debugf("RoomLivingUserIdFilter, analysisMemberStr %s, %d", tempGroupUid, userId) if _, flag := userIdSet[userId]; flag { resultUserSet[userId] = tempGroupUid } } return resultUserSet, nil } //获取有人的房间, 返回值:Map[groupUid]Set func RoomLivingExistsGroup(model *domain.Model) (map[string]map[uint64]struct{}, error) { key := redis_key.GetPrefixGroupRoomLiving() //if err := redisCli.ClearExpired(key, expireMinute); err != nil { if err := model.Redis.ZRemRangeByScore(model, key, "0", strconv.FormatInt(time.Now().Unix()-expireMinute, 10)).Err(); err != nil { model.Log.Infof("RoomLivingExistsGroup: err:%v", err) return nil, myerr.WrapErr(err) } groupUserIdstrs, err := redisCli.GetRedis().ZRange(context.Background(), key, 0, -1).Result() if err != nil { return nil, myerr.WrapErr(err) } model.Log.Infof("group_room_living RoomLivingExistsGroup groupUserIdstrs:%v", groupUserIdstrs) groupGroup := map[string]map[uint64]struct{}{} for i, _ := range groupUserIdstrs { tempGroupUid, userId := analysisMemberStr(groupUserIdstrs[i]) if a, flag := groupGroup[tempGroupUid]; flag { a[userId] = struct{}{} } else { groupGroup[tempGroupUid] = map[uint64]struct{}{userId: {}} } } model.Log.Infof("RoomLivingExistsGroup size = %d", len(groupGroup)) return groupGroup, nil } func BatchGetRoomCount(model *domain.Model, groupIds []string) (map[string]int, error) { liveRoom, err := RoomLivingExistsGroup(model) if err != nil { return nil, err } result := make(map[string]int, 0) for _, i := range groupIds { result[i] = len(liveRoom[i]) } return result, nil } // 取所有有人的房间,以房间人数排序 func GetAllGroupsSorted(model *domain.Model) ([]string, error) { // 1. 当前有人的房间 liveRoom, err := RoomLivingExistsGroup(model) if err != nil { model.Log.Infof("GetAllGroupsSorted: err:%v", err) return nil, err } roomCount := make(map[string]int, 0) for k, i := range liveRoom { roomCount[k] = len(i) } model.Log.Infof("GetAllGroupsSorted, living room : %v", roomCount) groupIds := make([]string, 0) for k, _ := range liveRoom { groupIds = append(groupIds, k) } sort.Slice(groupIds, func(i, j int) bool { gi := groupIds[i] gj := groupIds[j] return roomCount[gi] > roomCount[gj] || roomCount[gi] == roomCount[gj] && gi < gj }) return groupIds, nil } // 取房间最近N天的访问人数 func GetRoomVisitCount(groupId string) (int64, error) { // 每群定时请一次数据可以了 now := time.Now() if now.Second()%redis_key.GroupInDurationClearPeriod == 0 { // 先移除(N天之前的),后统计 if _, err := redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis_key.GetPrefixGroupInUserDuration(groupId), "0", strconv.FormatInt(time.Now().AddDate(0, 0, -redis_key.GroupInDurationTTL).Unix(), 10)). Result(); err != nil { return 0, err } } visitCount, err := redisCli.GetRedis().ZCard(context.Background(), redis_key.GetPrefixGroupInUserDuration(groupId)).Result() if err != nil { return 0, err } return visitCount, nil } // 批量查询房间的麦位 TODO: 优化性能 func GetRoomMicUsers(model *domain.Model, groupIds []string) (map[string][]uint64, error) { result := make(map[string][]uint64, 0) for _, g := range groupIds { userIds, err := GetAllMicUser(g) if err != nil { model.Log.Warnf("GetRoomMicUsers, GetAllMicUser: groupId %s err %s", g, err.Error()) } else { result[g] = userIds } } return result, nil } // 批量查询房间最近N天的访问人数。使用二级缓存 func BatchGetRoomVisitCount(logE *logrus.Entry, groupIds []string) (map[string]int64, error) { //roomVisitCount, err := redis.GetAllRoomVisitCount() roomVisitCount, err := room_c.MGetRoomVisitCount(groupIds) if err != nil { return nil, err } logE.Infof("MGetRoomVisitCount:%v", roomVisitCount) visitCount := make(map[string]int64) for _, groupId := range groupIds { // 先从二级缓存中找 if c, ok := roomVisitCount[groupId]; ok { if vc, err := strconv.ParseInt(c, 10, 64); err == nil && vc > 0 { logE.Debugf("GetRoomVisitCount, from roomVisitCount %s - %d", groupId, vc) visitCount[groupId] = vc } } else { // 如果没有,就从roomVisit中取 if vc, err := room_c.GetSetRoomVisitCount(groupId); err == nil && vc > 0 { logE.Infof("GetRoomVisitCount, from roomVisit %s - %d", groupId, vc) visitCount[groupId] = vc } } } return visitCount, nil } type UserEnterRoom struct { UserId uint64 GroupId string EnterTime time.Time } func (uer *UserEnterRoom) Save(db *gorm.DB) error { return db.Clauses(clause.OnConflict{UpdateAll: true}).Create(uer).Error } func (uer *UserEnterRoom) Find(db *gorm.DB) ([]UserEnterRoom, error) { rows := make([]UserEnterRoom, 0) if err := db.Where(uer).Order("enter_time DESC").Find(&rows).Error; err != nil { return nil, err } return rows, nil } type RoomMonthConsume struct { GroupId string Month string Diamond uint64 } func (rmc *RoomMonthConsume) AddDiamond(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ DoUpdates: clause.Assignments(map[string]interface{}{ "diamond": gorm.Expr("diamond + ?", rmc.Diamond)}), }).Create(rmc).Error } func (rmc *RoomMonthConsume) GetTotalDiamond(db *gorm.DB) (uint64, error) { rows := make([]struct{ Total uint64 }, 0) if err := db.Model(&RoomMonthConsume{}).Select("SUM(diamond) AS total"). Where(rmc).Find(&rows).Error; err != nil { return 0, err } if len(rows) > 0 { return rows[0].Total, nil } return 0, nil } type RoomWeekConsume struct { GroupId string Week string Diamond uint64 } func (rwc *RoomWeekConsume) AddDiamond(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ DoUpdates: clause.Assignments(map[string]interface{}{ "diamond": gorm.Expr("diamond + ?", rwc.Diamond)}), }).Create(rwc).Error } type RoomDayConsume struct { GroupId string Day string Diamond uint64 } func (rdc *RoomDayConsume) AddDiamond(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ DoUpdates: clause.Assignments(map[string]interface{}{ "diamond": gorm.Expr("diamond + ?", rdc.Diamond)}), }).Create(rdc).Error } type RoomUserDayConsume struct { Day string GroupId string SendUserId uint64 Diamond uint64 } func (rudc *RoomUserDayConsume) AddDiamond(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ DoUpdates: clause.Assignments(map[string]interface{}{ "diamond": gorm.Expr("diamond + ?", rudc.Diamond)}), }).Create(rudc).Error } // todo todo 这里可能需要!!! //func init() { // event.AddGroupInAsync(func(model *domain.Model, e interface{}) error { // event := e.(*event.GroupInEvent) // model.Log.Infof("AddGroupInAsync for room: %+v", event) // // uer := UserEnterRoom{ // UserId: event.UserId, // GroupId: event.GroupId, // EnterTime: time.Now(), // } // err := uer.save(model.Db) // model.Log.Infof("AddGroupInAsync, UserEnterRoom err: %v", err) // return err // }) // // event.AddSendGift(func(model *domain.Model, event *event.SendGiftEvent) error { // if event.NoExecSync { // model.Log.Infof("Skip %+v", event) // return nil // } // model.Log.Infof("AddSendGift room: %+v", event) // // // 群消费月表涉及总消费额,需要在同步处理中,但不报错 // if event.ResGift.GiftType == mysql.DiamondYellow && event.SceneType == gift_m2.GroupSceneType && len(event.SceneUid) > 0 { // now := time.Now() // rmc := RoomMonthConsume{ // GroupId: event.SceneUid, // Month: now.Format(common.COMPACT_MONTH_FORMAT), // Diamond: event.GetDiamondTotal(), // } // err := rmc.AddDiamond(model.Db) // if err != nil { // model.Log.Infof("AddSendGiftAsync room, %+v AddDiamond failed: %v", rmc, err) // } else { // // 增加后在事务中读出来,供异步事件中使用 // rmc = RoomMonthConsume{GroupId: event.SceneUid} // if consume, err := rmc.GetTotalDiamond(model.Db); err == nil { // event.TotalConsume = consume // } // } // } // return nil // }) // // event.AddSendGiftAsync(func(model *domain.Model, event *event.SendGiftEvent) error { // model.Log.Infof("AddSendGiftAsync room: %+v", event) // // if event.SceneType == gift_m2.GroupSceneType && len(event.SceneUid) > 0 { // now := time.Now() // rwc := RoomWeekConsume{ // GroupId: event.SceneUid, // Week: common.GetMonday(now).Format(common.DATE_FORMAT), // Diamond: event.GetDiamondTotal(), // } // err := rwc.AddDiamond(model.Db) // if err != nil { // model.Log.Infof("AddSendGiftAsync room, %+v AddDiamond failed: %v", rwc, err) // } // // rdc := RoomDayConsume{ // GroupId: event.SceneUid, // Day: now.Format(common.DATE_FORMAT), // Diamond: event.GetDiamondTotal(), // } // err = rdc.AddDiamond(model.Db) // if err != nil { // model.Log.Infof("AddSendGiftAsync room, %+v AddDiamond failed: %v", rdc, err) // } // // err = redis.ClearGroupConsume(event.SceneUid) // if err != nil { // model.Log.Infof("AddSendGiftAsync room, group %s redis.ClearGroupConsume failed: %v", event.SceneUid, err) // } // } // return nil // }) // // event.AddSendGiftAsync(func(model *domain.Model, event *event.SendGiftEvent) error { // model.Log.Infof("AddSendGiftAsync roomUserDay: %+v", event) // // if event.SceneType == gift_m2.GroupSceneType && len(event.SceneUid) > 0 { // now := time.Now() // rudc := RoomUserDayConsume{ // Day: now.Format(common.DATE_FORMAT), // GroupId: event.SceneUid, // SendUserId: event.SendUserId, // Diamond: event.GetDiamondTotal(), // } // err := rudc.AddDiamond(model.Db) // if err != nil { // model.Log.Infof("AddSendGiftAsync roomUserDay, %+v AddDiamond failed: %v", rudc, err) // } // return err // } // return nil // }) // // event.AddUserFreezeAsync(func(model *domain.Model, event *event.UserFreezeEvent) error { // model.Log.Infof("AddUserFreezeAsync room: %+v", event) // user, err := user_m.GetUser(model, event.UserId) // if err != nil { // return err // } // if _, err := RoomLivingLeaveByMgr(model, user.ID, user.ExternalId, ""); err != nil { // model.Log.Errorf("AddUserFreezeAsync room err:%v", err) // } // // // 连带重置群信息 // if gi, err := FindGroupByOwner(model, event.UserId); err == nil && len(gi) > 0 { // if err = ResetGroupInfo(model, gi[0].ImGroupId, gi[0].Code); err != nil { // model.Log.Warnf("ResetGroupInfo user %d's group %s failed", event.UserId, gi[0].ImGroupId) // } // } // return nil // }) // //}