groupMsg.go 12.6 KB
Newer Older
hujiebin's avatar
hujiebin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
package group_m

import (
	"context"
	"encoding/json"
	"git.hilo.cn/hilo-common/domain"
	"git.hilo.cn/hilo-common/mylogrus"
	"git.hilo.cn/hilo-common/resource/config"
	"git.hilo.cn/hilo-common/resource/mysql"
	"git.hilo.cn/hilo-common/resource/redisCli"
	redis2 "github.com/go-redis/redis/v8"
	"hilo-group/_const/redis_key"
	"hilo-group/myerr"
	"time"
)

/**
 * 消息。基于。替代腾讯云的IM的设计。同时考虑消息的性能以及持久化
 * 设计:
	1:持久化
		数据库持久化(全量持久化)
	2:消息性能。由于消息数据量很大。不能基于消息数据库查询。数据获取以缓存为主,数据确认以数据库数据为准。
	3:缓存数据:异步并接受消息丢失的容错
		1:群组最大消息序列号(递增,无过期,删除基于:群组解散)。redis key:group_msg_seq_max_all Map(key:groupUuid, value:序列号)
		2:群组用户最大读消息序列号(递增,无过期,删除基于:群组解散) redis key:group_msg_seq_max_{uuid} Map(key:用户名字, value:序列号)
		3:群组5分钟内发言的用户(有增有减,删除时间在于增加时候,判断时间是否在5分钟内) redis key:group_msg_duration_user sort set(key:群组Id_用户ID,score:时间戳)
     4:群组优先级发言的得分(依据于用户数)。(有增有减少,触发时机:群组5分钟内的发言数量) redis key:group_msg_duration_score sortSet (key:群组Id, score:得分(用户数))
*/

//redis的群组最大消息序列号
type RediGroupMsgSeqMax struct {
	//序列号
	Seq uint64
	//群组ID
	GroupUuid string
}

//设置新的序列号
func (rediGroupMsgSeqMax *RediGroupMsgSeqMax) NewSeq(seq uint64) *RediGroupMsgSeqMax {
	rediGroupMsgSeqMax.Seq = seq
	return rediGroupMsgSeqMax
}

//redis的群组用户最大消息序列号
type RedisGroupMsgSeqMaxUser struct {
	//群组ID
	GroupUuid string
	//用户ExternalId
	ExternalId string
	//序列号
	Seq uint64
}

//redis的群组用户最大消息序列号
func (redisGroupMsgSeqMaxUser *RedisGroupMsgSeqMaxUser) NewSeq(seq uint64) *RedisGroupMsgSeqMaxUser {
	redisGroupMsgSeqMaxUser.Seq = seq
	return redisGroupMsgSeqMaxUser
}

//redis的群组期间用户发言
type RedisGroupMsgDurationUser struct {
	//群组ID
	GroupUuid string
	//用户ExternalId
	ExternalId string
	//时间戳
	TimeStamp int64
}

//群组中发言的数量
type RedisGroupMsgDurationScore struct {
	//群组ID
	GroupUuid string
	//用户的数量, 负数减少
	AddCountUser int
}

//减少次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) reduce() {
	redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser - 1
}

//增加次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) add() *RedisGroupMsgDurationScore {
	redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser + 1
	return redisGroupMsgDurationScore
}

//删除
func GroupUserMsgDurationDel(redisGroupUserMsgDurations []*RedisGroupMsgDurationUser) ([]*RedisGroupMsgDurationScore, error) {
	groupMsgCountUserMap := map[string]*RedisGroupMsgDurationScore{}
	for i := 0; i < len(redisGroupUserMsgDurations); i++ {
		if redisGroupMsgCountUser, ok := groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid]; ok {
			redisGroupMsgCountUser.reduce()
		} else {
			redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(redisGroupUserMsgDurations[i].GroupUuid)
			if err != nil {
				return nil, err
			}
			redisGroupMsgCountUser.reduce()
			groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid] = redisGroupMsgCountUser
		}
	}
	//for _, v := range redisGroupUserMsgDurations {
	//	if redisGroupMsgCountUser, ok := groupMsgCountUserMap[v.GroupUuid]; ok {
	//		redisGroupMsgCountUser.reduce()
	//	} else {
	//		redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(v.GroupUuid)
	//		if err != nil {
	//			return nil, err
	//		}
	//		redisGroupMsgCountUser.reduce()
	//		groupMsgCountUserMap[v.GroupUuid] = redisGroupMsgCountUser
	//	}
	//}
	groupMsgCountUserList := []*RedisGroupMsgDurationScore{}
	for k, _ := range groupMsgCountUserMap {
		groupMsgCountUserList = append(groupMsgCountUserList, groupMsgCountUserMap[k])
	}
	return groupMsgCountUserList, nil
}

//持久化,群组消息
type GroupMsg struct {
	mysql.Entity
	*domain.Model   `gorm:"-"`
	CallbackCommand string
	GroupId         string
	Type            string //类型
	FromAccount     string //发送者
	OperatorAccount string //请求的发送者
	Random          int    //随机数
	MsgSeq          uint64
	MsgTime         uint64
	OnlineOnlyFlag  uint8
	MsgBody         string
}

//获取最近说话的排序
func GetRecentSort() ([]string, error) {
	//查找周期性是否存在
	if groupUuidsStr, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap()).Result(); err != nil {
		//没有,则重新生成
		if err == redis2.Nil {
			//移除得分为0的
			_, err := redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), "0", "0").Result()
			if err != nil {
				//容错,只是打印错误
				mylogrus.MyLog.Infoln("redis group_msg_duration_score ZRemRangeByScore err:%v", err)
			}
			//倒叙获取所有的分数
			// ZREVRANGEBYSCORE salary +inf -inf
			if groupUuids, err := redisCli.GetRedis().ZRevRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), &redis2.ZRangeBy{
				Min: "-inf",
				Max: "+inf",
			}).Result(); err != nil {
				return nil, myerr.WrapErr(err)
			} else {
				//重新加入
				bytes, err := json.Marshal(groupUuids)
				if err != nil {
					return nil, myerr.WrapErr(err)
				}
				if err := redisCli.GetRedis().SetEX(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap(), string(bytes), time.Duration(config.GetGroupImConfig().MSG_SORT_SNAP)*time.Second).Err(); err != nil {
					return nil, myerr.WrapErr(err)
				} else {
					return groupUuids, nil
				}
			}
		} else {
			return nil, myerr.WrapErr(err)
		}
	} else {
		groupUuids := []string{}
		if err := json.Unmarshal([]byte(groupUuidsStr), &groupUuids); err != nil {
			return nil, myerr.WrapErr(err)
		}
		return groupUuids, nil
	}
}
//
//func init() {
//	//由于redis在执行数据读写过程中,是单线程,并且有数据的统计,是属于递增/递减 n,或者数据集直接拿出来畜栏里, 因此就是存在并发,也不影响数据的一致性。
//	event.AddGroupMsgNewAsync(func(model *domain.Model, event *event.GroupMsgNewEvent) error {
//		model.Log.Infof("GroupMsgNewAsync groupMsg GroupUuid:%v, FromAccount:%v, MsgSeq:%v, MsgTime:%v", event.GroupUuid, event.FromAccount, event.MsgSeq, event.MsgTime)
//		//redis中群组中最大的消息序列
//		rediGroupMsgMaxSeq, err := getRediGroupMsgMaxSeqOrInit(event.GroupUuid)
//		if err != nil {
//			return err
//		}
//		if err := rediGroupMsgMaxSeq.NewSeq(event.MsgSeq).Persistent(); err != nil {
//			return err
//		}
//		/*		//用户在群中的最大消息序列,
//				redisGroupUserMsgMaxSeq, err := getRedisGroupUserMsgMaxSeqOrInit(event.GroupUuid, event.FromAccount)
//				if err != nil {
//					return err
//				}
//				if err := redisGroupUserMsgMaxSeq.Persistent(); err != nil {
//					return err
//				}*/
//		//查找已到过期时间的消息
//		redisGroupUserMsgDurations, err := getExpireRedisGroupUserMsgDuration()
//		if err != nil {
//			return err
//		}
//		redisGroupMsgCountUsers, err := GroupUserMsgDurationDel(redisGroupUserMsgDurations)
//		for i := 0; i < len(redisGroupMsgCountUsers); i++ {
//			if err := redisGroupMsgCountUsers[i].Persistent(); err != nil {
//				return err
//			}
//		}
//		//增加一条记录,
//		i, err := initRedisGroupUserMsgDuration(event.GroupUuid, event.FromAccount).Persistent()
//		if err != nil {
//			return err
//		}
//		//如果是新增,则开始增加一条记录
//		if i > 0 {
//			redisGroupMsDurationScore, err := getRedisGroupMsgDurationScoreInit(event.GroupUuid)
//			if err != nil {
//				return err
//			}
//			if err := redisGroupMsDurationScore.add().Persistent(); err != nil {
//				return err
//			}
//		}
//		return nil
//	})
//	//发送消息
//	//发送
//
//	//幸运转盘创建
//	event.AddLuckyWheelCreateAsync(func(model *domain.Model, event *event.LuckyWheelCreateEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelCreateAsync LuckyWheelId:%v, CreateUserId:%v, GroupUid:%v, LuckyWheelSeatId:%v", event.LuckyWheelId, event.CreateUserId, event.GroupUid, event.LuckyWheelSeatId)
//		/*		if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCreateAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelCreate); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCreateAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelCreate})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘Play
//	event.AddLuckyWheelPlayAsync(func(model *domain.Model, event *event.LuckyWheelPlayEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelPlayAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelPlayAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelPlay); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelPlayAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelPlay})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘加入
//	event.AddLuckyWheelJoinAsync(func(model *domain.Model, event *event.LuckyWheelJoinEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelJoinAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelJoinAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserJoin); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelJoinAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserJoin})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//转盘超时回滚
//	event.AddLuckyWheelTimeOutAsync(func(model *domain.Model, event *event.LuckyWheelTimeOutEvent) error {
//		model.Log.Infof("groupMsg AddLuckyWheelTimeOutAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelTimeOutAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserTimeOut); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelTimeOutAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserTimeOut})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//
//	//幸运转盘user取消
//	event.AddLuckyWheelCancelAsync(func(model *domain.Model, event *event.LuckyWheelCancelEvent) error {
//		model.Log.Infof("msg AddLuckyWheelCancelAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
//		/*		userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
//				if err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCancelAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
//				}
//				if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserCancel); err != nil {
//					model.Log.Errorf("msg AddLuckyWheelCancelAsync SendLuckyWheel err:%v", err)
//				}*/
//		context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserCancel})
//		sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
//			MsgId:   group_enum.GroupLuckyWheel,
//			Content: string(context),
//		}, true)
//		return nil
//	})
//}