groupMsg.go 12.6 KB
package group_m

import (
	"context"
	"encoding/json"
	"git.hilo.cn/hilo-common/domain"
	"git.hilo.cn/hilo-common/mylogrus"
	"git.hilo.cn/hilo-common/resource/config"
	"git.hilo.cn/hilo-common/resource/mysql"
	"git.hilo.cn/hilo-common/resource/redisCli"
	redis2 "github.com/go-redis/redis/v8"
	"hilo-group/_const/redis_key"
	"hilo-group/myerr"
	"time"
)

/**
 * 消息。基于。替代腾讯云的IM的设计。同时考虑消息的性能以及持久化
 * 设计:
	1:持久化
		数据库持久化(全量持久化)
	2:消息性能。由于消息数据量很大。不能基于消息数据库查询。数据获取以缓存为主,数据确认以数据库数据为准。
	3:缓存数据:异步并接受消息丢失的容错
		1:群组最大消息序列号(递增,无过期,删除基于:群组解散)。redis key:group_msg_seq_max_all Map(key:groupUuid, value:序列号)
		2:群组用户最大读消息序列号(递增,无过期,删除基于:群组解散) redis key:group_msg_seq_max_{uuid} Map(key:用户名字, value:序列号)
		3:群组5分钟内发言的用户(有增有减,删除时间在于增加时候,判断时间是否在5分钟内) redis key:group_msg_duration_user sort set(key:群组Id_用户ID,score:时间戳)
     4:群组优先级发言的得分(依据于用户数)。(有增有减少,触发时机:群组5分钟内的发言数量) redis key:group_msg_duration_score sortSet (key:群组Id, score:得分(用户数))
*/

//redis的群组最大消息序列号
type RediGroupMsgSeqMax struct {
	//序列号
	Seq uint64
	//群组ID
	GroupUuid string
}

//设置新的序列号
func (rediGroupMsgSeqMax *RediGroupMsgSeqMax) NewSeq(seq uint64) *RediGroupMsgSeqMax {
	rediGroupMsgSeqMax.Seq = seq
	return rediGroupMsgSeqMax
}

//redis的群组用户最大消息序列号
type RedisGroupMsgSeqMaxUser struct {
	//群组ID
	GroupUuid string
	//用户ExternalId
	ExternalId string
	//序列号
	Seq uint64
}

//redis的群组用户最大消息序列号
func (redisGroupMsgSeqMaxUser *RedisGroupMsgSeqMaxUser) NewSeq(seq uint64) *RedisGroupMsgSeqMaxUser {
	redisGroupMsgSeqMaxUser.Seq = seq
	return redisGroupMsgSeqMaxUser
}

//redis的群组期间用户发言
type RedisGroupMsgDurationUser struct {
	//群组ID
	GroupUuid string
	//用户ExternalId
	ExternalId string
	//时间戳
	TimeStamp int64
}

//群组中发言的数量
type RedisGroupMsgDurationScore struct {
	//群组ID
	GroupUuid string
	//用户的数量, 负数减少
	AddCountUser int
}

//减少次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) reduce() {
	redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser - 1
}

//增加次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) add() *RedisGroupMsgDurationScore {
	redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser + 1
	return redisGroupMsgDurationScore
}

//删除
func GroupUserMsgDurationDel(redisGroupUserMsgDurations []*RedisGroupMsgDurationUser) ([]*RedisGroupMsgDurationScore, error) {
	groupMsgCountUserMap := map[string]*RedisGroupMsgDurationScore{}
	for i := 0; i < len(redisGroupUserMsgDurations); i++ {
		if redisGroupMsgCountUser, ok := groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid]; ok {
			redisGroupMsgCountUser.reduce()
		} else {
			redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(redisGroupUserMsgDurations[i].GroupUuid)
			if err != nil {
				return nil, err
			}
			redisGroupMsgCountUser.reduce()
			groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid] = redisGroupMsgCountUser
		}
	}
	//for _, v := range redisGroupUserMsgDurations {
	//	if redisGroupMsgCountUser, ok := groupMsgCountUserMap[v.GroupUuid]; ok {
	//		redisGroupMsgCountUser.reduce()
	//	} else {
	//		redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(v.GroupUuid)
	//		if err != nil {
	//			return nil, err
	//		}
	//		redisGroupMsgCountUser.reduce()
	//		groupMsgCountUserMap[v.GroupUuid] = redisGroupMsgCountUser
	//	}
	//}
	groupMsgCountUserList := []*RedisGroupMsgDurationScore{}
	for k, _ := range groupMsgCountUserMap {
		groupMsgCountUserList = append(groupMsgCountUserList, groupMsgCountUserMap[k])
	}
	return groupMsgCountUserList, nil
}

//持久化,群组消息
type GroupMsg struct {
	mysql.Entity
	*domain.Model   `gorm:"-"`
	CallbackCommand string
	GroupId         string
	Type            string //类型
	FromAccount     string //发送者
	OperatorAccount string //请求的发送者
	Random          int    //随机数
	MsgSeq          uint64
	MsgTime         uint64
	OnlineOnlyFlag  uint8
	MsgBody         string
}

//获取最近说话的排序
func GetRecentSort() ([]string, error) {
	//查找周期性是否存在
	if groupUuidsStr, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap()).Result(); err != nil {
		//没有,则重新生成
		if err == redis2.Nil {
			//移除得分为0的
			_, err := redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), "0", "0").Result()
			if err != nil {
				//容错,只是打印错误
				mylogrus.MyLog.Infoln("redis group_msg_duration_score ZRemRangeByScore err:%v", err)
			}
			//倒叙获取所有的分数
			// ZREVRANGEBYSCORE salary +inf -inf
			if groupUuids, err := redisCli.GetRedis().ZRevRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), &redis2.ZRangeBy{
				Min: "-inf",
				Max: "+inf",
			}).Result(); err != nil {
				return nil, myerr.WrapErr(err)
			} else {
				//重新加入
				bytes, err := json.Marshal(groupUuids)
				if err != nil {
					return nil, myerr.WrapErr(err)
				}
				if err := redisCli.GetRedis().SetEX(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap(), string(bytes), time.Duration(config.GetGroupImConfig().MSG_SORT_SNAP)*time.Second).Err(); err != nil {
					return nil, myerr.WrapErr(err)
				} else {
					return groupUuids, nil
				}
			}
		} else {
			return nil, myerr.WrapErr(err)
		}
	} else {
		groupUuids := []string{}
		if err := json.Unmarshal([]byte(groupUuidsStr), &groupUuids); err != nil {
			return nil, myerr.WrapErr(err)
		}
		return groupUuids, nil
	}
}
//
//func init() {
//	//由于redis在执行数据读写过程中,是单线程,并且有数据的统计,是属于递增/递减 n,或者数据集直接拿出来畜栏里, 因此就是存在并发,也不影响数据的一致性。
//	event.AddGroupMsgNewAsync(func(model *domain.Model, event *event.GroupMsgNewEvent) error {
//		model.Log.Infof("GroupMsgNewAsync groupMsg GroupUuid:%v, FromAccount:%v, MsgSeq:%v, MsgTime:%v", event.GroupUuid, event.FromAccount, event.MsgSeq, event.MsgTime)
//		//redis中群组中最大的消息序列
//		rediGroupMsgMaxSeq, err := getRediGroupMsgMaxSeqOrInit(event.GroupUuid)
//		if err != nil {
//			return err
//		}
//		if err := rediGroupMsgMaxSeq.NewSeq(event.MsgSeq).Persistent(); err != nil {
//			return err
//		}
//		/*		//用户在群中的最大消息序列,
//				redisGroupUserMsgMaxSeq, err := getRedisGroupUserMsgMaxSeqOrInit(event.GroupUuid, event.FromAccount)
//				if err != nil {
//					return err
//				}
//				if err := redisGroupUserMsgMaxSeq.Persistent(); err != nil {
//					return err
//				}*/
//		//查找已到过期时间的消息
//		redisGroupUserMsgDurations, err := getExpireRedisGroupUserMsgDuration()
//		if err != nil {
//			return err
//		}
//		redisGroupMsgCountUsers, err := GroupUserMsgDurationDel(redisGroupUserMsgDurations)
//		for i := 0; i < len(redisGroupMsgCountUsers); i++ {
//			if err := redisGroupMsgCountUsers[i].Persistent(); err != nil {
//				return err
//			}
//		}
//		//增加一条记录,
//		i, err := initRedisGroupUserMsgDuration(event.GroupUuid, event.FromAccount).Persistent()
//		if err != nil {
//			return err
//		}
//		//如果是新增,则开始增加一条记录
//		if i > 0 {
//			redisGroupMsDurationScore, err := getRedisGroupMsgDurationScoreInit(event.GroupUuid)
//			if err != nil {
//				return err
//			}
//			if err := redisGroupMsDurationScore.add().Persistent(); err != nil {
//				return err
//			}
//		}
//		return nil
//	})
//	//发送消息
//	//发送
//
//	//幸运转盘创建
//	event.AddLuckyWheelCreateAsync(func(model *domain.Model, event *event.LuckyWheelCreateEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelCreateAsync LuckyWheelId:%v, CreateUserId:%v, GroupUid:%v, LuckyWheelSeatId:%v", event.LuckyWheelId, event.CreateUserId, event.GroupUid, event.LuckyWheelSeatId)
//		/*		if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCreateAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelCreate); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCreateAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelCreate})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘Play
//	event.AddLuckyWheelPlayAsync(func(model *domain.Model, event *event.LuckyWheelPlayEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelPlayAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelPlayAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelPlay); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelPlayAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelPlay})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘加入
//	event.AddLuckyWheelJoinAsync(func(model *domain.Model, event *event.LuckyWheelJoinEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelJoinAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelJoinAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserJoin); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelJoinAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserJoin})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//转盘超时回滚
//	event.AddLuckyWheelTimeOutAsync(func(model *domain.Model, event *event.LuckyWheelTimeOutEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelTimeOutAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelTimeOutAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserTimeOut); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelTimeOutAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserTimeOut})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘user取消
//	event.AddLuckyWheelCancelAsync(func(model *domain.Model, event *event.LuckyWheelCancelEvent) error {
//		model.Log.Infof("msg AddLuckyWheelCancelAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCancelAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserCancel); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCancelAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserCancel})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//}