package group_m import ( "context" "encoding/json" "git.hilo.cn/hilo-common/domain" "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/config" "git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/redisCli" redis2 "github.com/go-redis/redis/v8" "hilo-group/_const/redis_key" "hilo-group/myerr" "time" ) /** * 消息。基于。替代腾讯云的IM的设计。同时考虑消息的性能以及持久化 * 设计: 1:持久化 数据库持久化(全量持久化) 2:消息性能。由于消息数据量很大。不能基于消息数据库查询。数据获取以缓存为主,数据确认以数据库数据为准。 3:缓存数据:异步并接受消息丢失的容错 1:群组最大消息序列号(递增,无过期,删除基于:群组解散)。redis key:group_msg_seq_max_all Map(key:groupUuid, value:序列号) 2:群组用户最大读消息序列号(递增,无过期,删除基于:群组解散) redis key:group_msg_seq_max_{uuid} Map(key:用户名字, value:序列号) 3:群组5分钟内发言的用户(有增有减,删除时间在于增加时候,判断时间是否在5分钟内) redis key:group_msg_duration_user sort set(key:群组Id_用户ID,score:时间戳)      4:群组优先级发言的得分(依据于用户数)。(有增有减少,触发时机:群组5分钟内的发言数量) redis key:group_msg_duration_score sortSet (key:群组Id, score:得分(用户数)) */ //redis的群组最大消息序列号 type RediGroupMsgSeqMax struct { //序列号 Seq uint64 //群组ID GroupUuid string } //设置新的序列号 func (rediGroupMsgSeqMax *RediGroupMsgSeqMax) NewSeq(seq uint64) *RediGroupMsgSeqMax { rediGroupMsgSeqMax.Seq = seq return rediGroupMsgSeqMax } //redis的群组用户最大消息序列号 type RedisGroupMsgSeqMaxUser struct { //群组ID GroupUuid string //用户ExternalId ExternalId string //序列号 Seq uint64 } //redis的群组用户最大消息序列号 func (redisGroupMsgSeqMaxUser *RedisGroupMsgSeqMaxUser) NewSeq(seq uint64) *RedisGroupMsgSeqMaxUser { redisGroupMsgSeqMaxUser.Seq = seq return redisGroupMsgSeqMaxUser } //redis的群组期间用户发言 type RedisGroupMsgDurationUser struct { //群组ID GroupUuid string //用户ExternalId ExternalId string //时间戳 TimeStamp int64 } //群组中发言的数量 type RedisGroupMsgDurationScore struct { //群组ID GroupUuid string //用户的数量, 负数减少 AddCountUser int } //减少次数 func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) reduce() { redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser - 1 } //增加次数 func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) add() *RedisGroupMsgDurationScore { redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser + 1 return redisGroupMsgDurationScore } //删除 func GroupUserMsgDurationDel(redisGroupUserMsgDurations []*RedisGroupMsgDurationUser) ([]*RedisGroupMsgDurationScore, error) { groupMsgCountUserMap := map[string]*RedisGroupMsgDurationScore{} for i := 0; i < len(redisGroupUserMsgDurations); i++ { if redisGroupMsgCountUser, ok := groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid]; ok { redisGroupMsgCountUser.reduce() } else { redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(redisGroupUserMsgDurations[i].GroupUuid) if err != nil { return nil, err } redisGroupMsgCountUser.reduce() groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid] = redisGroupMsgCountUser } } //for _, v := range redisGroupUserMsgDurations { // if redisGroupMsgCountUser, ok := groupMsgCountUserMap[v.GroupUuid]; ok { // redisGroupMsgCountUser.reduce() // } else { // redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(v.GroupUuid) // if err != nil { // return nil, err // } // redisGroupMsgCountUser.reduce() // groupMsgCountUserMap[v.GroupUuid] = redisGroupMsgCountUser // } //} groupMsgCountUserList := []*RedisGroupMsgDurationScore{} for k, _ := range groupMsgCountUserMap { groupMsgCountUserList = append(groupMsgCountUserList, groupMsgCountUserMap[k]) } return groupMsgCountUserList, nil } //持久化,群组消息 type GroupMsg struct { mysql.Entity *domain.Model `gorm:"-"` CallbackCommand string GroupId string Type string //类型 FromAccount string //发送者 OperatorAccount string //请求的发送者 Random int //随机数 MsgSeq uint64 MsgTime uint64 OnlineOnlyFlag uint8 MsgBody string } //获取最近说话的排序 func GetRecentSort() ([]string, error) { //查找周期性是否存在 if groupUuidsStr, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap()).Result(); err != nil { //没有,则重新生成 if err == redis2.Nil { //移除得分为0的 _, err := redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), "0", "0").Result() if err != nil { //容错,只是打印错误 mylogrus.MyLog.Infoln("redis group_msg_duration_score ZRemRangeByScore err:%v", err) } //倒叙获取所有的分数 // ZREVRANGEBYSCORE salary +inf -inf if groupUuids, err := redisCli.GetRedis().ZRevRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), &redis2.ZRangeBy{ Min: "-inf", Max: "+inf", }).Result(); err != nil { return nil, myerr.WrapErr(err) } else { //重新加入 bytes, err := json.Marshal(groupUuids) if err != nil { return nil, myerr.WrapErr(err) } if err := redisCli.GetRedis().SetEX(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap(), string(bytes), time.Duration(config.GetGroupImConfig().MSG_SORT_SNAP)*time.Second).Err(); err != nil { return nil, myerr.WrapErr(err) } else { return groupUuids, nil } } } else { return nil, myerr.WrapErr(err) } } else { groupUuids := []string{} if err := json.Unmarshal([]byte(groupUuidsStr), &groupUuids); err != nil { return nil, myerr.WrapErr(err) } return groupUuids, nil } } // //func init() { // //由于redis在执行数据读写过程中,是单线程,并且有数据的统计,是属于递增/递减 n,或者数据集直接拿出来畜栏里, 因此就是存在并发,也不影响数据的一致性。 // event.AddGroupMsgNewAsync(func(model *domain.Model, event *event.GroupMsgNewEvent) error { // model.Log.Infof("GroupMsgNewAsync groupMsg GroupUuid:%v, FromAccount:%v, MsgSeq:%v, MsgTime:%v", event.GroupUuid, event.FromAccount, event.MsgSeq, event.MsgTime) // //redis中群组中最大的消息序列 // rediGroupMsgMaxSeq, err := getRediGroupMsgMaxSeqOrInit(event.GroupUuid) // if err != nil { // return err // } // if err := rediGroupMsgMaxSeq.NewSeq(event.MsgSeq).Persistent(); err != nil { // return err // } // /* //用户在群中的最大消息序列, // redisGroupUserMsgMaxSeq, err := getRedisGroupUserMsgMaxSeqOrInit(event.GroupUuid, event.FromAccount) // if err != nil { // return err // } // if err := redisGroupUserMsgMaxSeq.Persistent(); err != nil { // return err // }*/ // //查找已到过期时间的消息 // redisGroupUserMsgDurations, err := getExpireRedisGroupUserMsgDuration() // if err != nil { // return err // } // redisGroupMsgCountUsers, err := GroupUserMsgDurationDel(redisGroupUserMsgDurations) // for i := 0; i < len(redisGroupMsgCountUsers); i++ { // if err := redisGroupMsgCountUsers[i].Persistent(); err != nil { // return err // } // } // //增加一条记录, // i, err := initRedisGroupUserMsgDuration(event.GroupUuid, event.FromAccount).Persistent() // if err != nil { // return err // } // //如果是新增,则开始增加一条记录 // if i > 0 { // redisGroupMsDurationScore, err := getRedisGroupMsgDurationScoreInit(event.GroupUuid) // if err != nil { // return err // } // if err := redisGroupMsDurationScore.add().Persistent(); err != nil { // return err // } // } // return nil // }) // //发送消息 // //发送 // // //幸运转盘创建 // event.AddLuckyWheelCreateAsync(func(model *domain.Model, event *event.LuckyWheelCreateEvent) error { // model.Log.Infof("groupMsg AddLuckyWheelCreateAsync LuckyWheelId:%v, CreateUserId:%v, GroupUid:%v, LuckyWheelSeatId:%v", event.LuckyWheelId, event.CreateUserId, event.GroupUid, event.LuckyWheelSeatId) // /* if err != nil { // model.Log.Errorf("msg AddLuckyWheelCreateAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid) // } // if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelCreate); err != nil { // model.Log.Errorf("msg AddLuckyWheelCreateAsync SendLuckyWheel err:%v", err) // }*/ // context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelCreate}) // sendSignalMsg(model, event.GroupUid, GroupSystemMsg{ // MsgId: group_enum.GroupLuckyWheel, // Content: string(context), // }, true) // return nil // }) // // //幸运转盘Play // event.AddLuckyWheelPlayAsync(func(model *domain.Model, event *event.LuckyWheelPlayEvent) error { // model.Log.Infof("groupMsg AddLuckyWheelPlayAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid) // /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid) // if err != nil { // model.Log.Errorf("msg AddLuckyWheelPlayAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid) // } // if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelPlay); err != nil { // model.Log.Errorf("msg AddLuckyWheelPlayAsync SendLuckyWheel err:%v", err) // }*/ // context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelPlay}) // sendSignalMsg(model, event.GroupUid, GroupSystemMsg{ // MsgId: group_enum.GroupLuckyWheel, // Content: string(context), // }, true) // return nil // }) // // //幸运转盘加入 // event.AddLuckyWheelJoinAsync(func(model *domain.Model, event *event.LuckyWheelJoinEvent) error { // model.Log.Infof("groupMsg AddLuckyWheelJoinAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid) // /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid) // if err != nil { // model.Log.Errorf("msg AddLuckyWheelJoinAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid) // } // if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserJoin); err != nil { // model.Log.Errorf("msg AddLuckyWheelJoinAsync SendLuckyWheel err:%v", err) // }*/ // context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserJoin}) // sendSignalMsg(model, event.GroupUid, GroupSystemMsg{ // MsgId: group_enum.GroupLuckyWheel, // Content: string(context), // }, true) // return nil // }) // // //转盘超时回滚 // event.AddLuckyWheelTimeOutAsync(func(model *domain.Model, event *event.LuckyWheelTimeOutEvent) error { // model.Log.Infof("groupMsg AddLuckyWheelTimeOutAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid) // /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid) // if err != nil { // model.Log.Errorf("msg AddLuckyWheelTimeOutAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid) // } // if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserTimeOut); err != nil { // model.Log.Errorf("msg AddLuckyWheelTimeOutAsync SendLuckyWheel err:%v", err) // }*/ // context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserTimeOut}) // sendSignalMsg(model, event.GroupUid, GroupSystemMsg{ // MsgId: group_enum.GroupLuckyWheel, // Content: string(context), // }, true) // return nil // }) // // //幸运转盘user取消 // event.AddLuckyWheelCancelAsync(func(model *domain.Model, event *event.LuckyWheelCancelEvent) error { // model.Log.Infof("msg AddLuckyWheelCancelAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid) // /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid) // if err != nil { // model.Log.Errorf("msg AddLuckyWheelCancelAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid) // } // if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserCancel); err != nil { // model.Log.Errorf("msg AddLuckyWheelCancelAsync SendLuckyWheel err:%v", err) // }*/ // context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserCancel}) // sendSignalMsg(model, event.GroupUid, GroupSystemMsg{ // MsgId: group_enum.GroupLuckyWheel, // Content: string(context), // }, true) // return nil // }) //}