1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package group_m
import (
"context"
"encoding/json"
"git.hilo.cn/hilo-common/domain"
"git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/resource/config"
"git.hilo.cn/hilo-common/resource/mysql"
"git.hilo.cn/hilo-common/resource/redisCli"
redis2 "github.com/go-redis/redis/v8"
"hilo-group/_const/redis_key"
"hilo-group/myerr"
"time"
)
/**
* 消息。基于。替代腾讯云的IM的设计。同时考虑消息的性能以及持久化
* 设计:
1:持久化
数据库持久化(全量持久化)
2:消息性能。由于消息数据量很大。不能基于消息数据库查询。数据获取以缓存为主,数据确认以数据库数据为准。
3:缓存数据:异步并接受消息丢失的容错
1:群组最大消息序列号(递增,无过期,删除基于:群组解散)。redis key:group_msg_seq_max_all Map(key:groupUuid, value:序列号)
2:群组用户最大读消息序列号(递增,无过期,删除基于:群组解散) redis key:group_msg_seq_max_{uuid} Map(key:用户名字, value:序列号)
3:群组5分钟内发言的用户(有增有减,删除时间在于增加时候,判断时间是否在5分钟内) redis key:group_msg_duration_user sort set(key:群组Id_用户ID,score:时间戳)
4:群组优先级发言的得分(依据于用户数)。(有增有减少,触发时机:群组5分钟内的发言数量) redis key:group_msg_duration_score sortSet (key:群组Id, score:得分(用户数))
*/
//redis的群组最大消息序列号
type RediGroupMsgSeqMax struct {
//序列号
Seq uint64
//群组ID
GroupUuid string
}
//设置新的序列号
func (rediGroupMsgSeqMax *RediGroupMsgSeqMax) NewSeq(seq uint64) *RediGroupMsgSeqMax {
rediGroupMsgSeqMax.Seq = seq
return rediGroupMsgSeqMax
}
//redis的群组用户最大消息序列号
type RedisGroupMsgSeqMaxUser struct {
//群组ID
GroupUuid string
//用户ExternalId
ExternalId string
//序列号
Seq uint64
}
//redis的群组用户最大消息序列号
func (redisGroupMsgSeqMaxUser *RedisGroupMsgSeqMaxUser) NewSeq(seq uint64) *RedisGroupMsgSeqMaxUser {
redisGroupMsgSeqMaxUser.Seq = seq
return redisGroupMsgSeqMaxUser
}
//redis的群组期间用户发言
type RedisGroupMsgDurationUser struct {
//群组ID
GroupUuid string
//用户ExternalId
ExternalId string
//时间戳
TimeStamp int64
}
//群组中发言的数量
type RedisGroupMsgDurationScore struct {
//群组ID
GroupUuid string
//用户的数量, 负数减少
AddCountUser int
}
//减少次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) reduce() {
redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser - 1
}
//增加次数
func (redisGroupMsgDurationScore *RedisGroupMsgDurationScore) add() *RedisGroupMsgDurationScore {
redisGroupMsgDurationScore.AddCountUser = redisGroupMsgDurationScore.AddCountUser + 1
return redisGroupMsgDurationScore
}
//删除
func GroupUserMsgDurationDel(redisGroupUserMsgDurations []*RedisGroupMsgDurationUser) ([]*RedisGroupMsgDurationScore, error) {
groupMsgCountUserMap := map[string]*RedisGroupMsgDurationScore{}
for i := 0; i < len(redisGroupUserMsgDurations); i++ {
if redisGroupMsgCountUser, ok := groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid]; ok {
redisGroupMsgCountUser.reduce()
} else {
redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(redisGroupUserMsgDurations[i].GroupUuid)
if err != nil {
return nil, err
}
redisGroupMsgCountUser.reduce()
groupMsgCountUserMap[redisGroupUserMsgDurations[i].GroupUuid] = redisGroupMsgCountUser
}
}
//for _, v := range redisGroupUserMsgDurations {
// if redisGroupMsgCountUser, ok := groupMsgCountUserMap[v.GroupUuid]; ok {
// redisGroupMsgCountUser.reduce()
// } else {
// redisGroupMsgCountUser, err := getRedisGroupMsgDurationScoreInit(v.GroupUuid)
// if err != nil {
// return nil, err
// }
// redisGroupMsgCountUser.reduce()
// groupMsgCountUserMap[v.GroupUuid] = redisGroupMsgCountUser
// }
//}
groupMsgCountUserList := []*RedisGroupMsgDurationScore{}
for k, _ := range groupMsgCountUserMap {
groupMsgCountUserList = append(groupMsgCountUserList, groupMsgCountUserMap[k])
}
return groupMsgCountUserList, nil
}
//持久化,群组消息
type GroupMsg struct {
mysql.Entity
*domain.Model `gorm:"-"`
CallbackCommand string
GroupId string
Type string //类型
FromAccount string //发送者
OperatorAccount string //请求的发送者
Random int //随机数
MsgSeq uint64
MsgTime uint64
OnlineOnlyFlag uint8
MsgBody string
}
//获取最近说话的排序
func GetRecentSort() ([]string, error) {
//查找周期性是否存在
if groupUuidsStr, err := redisCli.GetRedis().Get(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap()).Result(); err != nil {
//没有,则重新生成
if err == redis2.Nil {
//移除得分为0的
_, err := redisCli.GetRedis().ZRemRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), "0", "0").Result()
if err != nil {
//容错,只是打印错误
mylogrus.MyLog.Infoln("redis group_msg_duration_score ZRemRangeByScore err:%v", err)
}
//倒叙获取所有的分数
// ZREVRANGEBYSCORE salary +inf -inf
if groupUuids, err := redisCli.GetRedis().ZRevRangeByScore(context.Background(), redis_key.GetPrefixGroupMsgDurationScore(), &redis2.ZRangeBy{
Min: "-inf",
Max: "+inf",
}).Result(); err != nil {
return nil, myerr.WrapErr(err)
} else {
//重新加入
bytes, err := json.Marshal(groupUuids)
if err != nil {
return nil, myerr.WrapErr(err)
}
if err := redisCli.GetRedis().SetEX(context.Background(), redis_key.GetPrefixGroupMsgDurationScoreSnap(), string(bytes), time.Duration(config.GetGroupImConfig().MSG_SORT_SNAP)*time.Second).Err(); err != nil {
return nil, myerr.WrapErr(err)
} else {
return groupUuids, nil
}
}
} else {
return nil, myerr.WrapErr(err)
}
} else {
groupUuids := []string{}
if err := json.Unmarshal([]byte(groupUuidsStr), &groupUuids); err != nil {
return nil, myerr.WrapErr(err)
}
return groupUuids, nil
}
}
//
//func init() {
// //由于redis在执行数据读写过程中,是单线程,并且有数据的统计,是属于递增/递减 n,或者数据集直接拿出来畜栏里, 因此就是存在并发,也不影响数据的一致性。
// event.AddGroupMsgNewAsync(func(model *domain.Model, event *event.GroupMsgNewEvent) error {
// model.Log.Infof("GroupMsgNewAsync groupMsg GroupUuid:%v, FromAccount:%v, MsgSeq:%v, MsgTime:%v", event.GroupUuid, event.FromAccount, event.MsgSeq, event.MsgTime)
// //redis中群组中最大的消息序列
// rediGroupMsgMaxSeq, err := getRediGroupMsgMaxSeqOrInit(event.GroupUuid)
// if err != nil {
// return err
// }
// if err := rediGroupMsgMaxSeq.NewSeq(event.MsgSeq).Persistent(); err != nil {
// return err
// }
// /* //用户在群中的最大消息序列,
// redisGroupUserMsgMaxSeq, err := getRedisGroupUserMsgMaxSeqOrInit(event.GroupUuid, event.FromAccount)
// if err != nil {
// return err
// }
// if err := redisGroupUserMsgMaxSeq.Persistent(); err != nil {
// return err
// }*/
// //查找已到过期时间的消息
// redisGroupUserMsgDurations, err := getExpireRedisGroupUserMsgDuration()
// if err != nil {
// return err
// }
// redisGroupMsgCountUsers, err := GroupUserMsgDurationDel(redisGroupUserMsgDurations)
// for i := 0; i < len(redisGroupMsgCountUsers); i++ {
// if err := redisGroupMsgCountUsers[i].Persistent(); err != nil {
// return err
// }
// }
// //增加一条记录,
// i, err := initRedisGroupUserMsgDuration(event.GroupUuid, event.FromAccount).Persistent()
// if err != nil {
// return err
// }
// //如果是新增,则开始增加一条记录
// if i > 0 {
// redisGroupMsDurationScore, err := getRedisGroupMsgDurationScoreInit(event.GroupUuid)
// if err != nil {
// return err
// }
// if err := redisGroupMsDurationScore.add().Persistent(); err != nil {
// return err
// }
// }
// return nil
// })
// //发送消息
// //发送
//
// //幸运转盘创建
// event.AddLuckyWheelCreateAsync(func(model *domain.Model, event *event.LuckyWheelCreateEvent) error {
// model.Log.Infof("groupMsg AddLuckyWheelCreateAsync LuckyWheelId:%v, CreateUserId:%v, GroupUid:%v, LuckyWheelSeatId:%v", event.LuckyWheelId, event.CreateUserId, event.GroupUid, event.LuckyWheelSeatId)
// /* if err != nil {
// model.Log.Errorf("msg AddLuckyWheelCreateAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
// }
// if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelCreate); err != nil {
// model.Log.Errorf("msg AddLuckyWheelCreateAsync SendLuckyWheel err:%v", err)
// }*/
// context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelCreate})
// sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
// MsgId: group_enum.GroupLuckyWheel,
// Content: string(context),
// }, true)
// return nil
// })
//
// //幸运转盘Play
// event.AddLuckyWheelPlayAsync(func(model *domain.Model, event *event.LuckyWheelPlayEvent) error {
// model.Log.Infof("groupMsg AddLuckyWheelPlayAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
// /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
// if err != nil {
// model.Log.Errorf("msg AddLuckyWheelPlayAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
// }
// if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelPlay); err != nil {
// model.Log.Errorf("msg AddLuckyWheelPlayAsync SendLuckyWheel err:%v", err)
// }*/
// context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelPlay})
// sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
// MsgId: group_enum.GroupLuckyWheel,
// Content: string(context),
// }, true)
// return nil
// })
//
// //幸运转盘加入
// event.AddLuckyWheelJoinAsync(func(model *domain.Model, event *event.LuckyWheelJoinEvent) error {
// model.Log.Infof("groupMsg AddLuckyWheelJoinAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
// /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
// if err != nil {
// model.Log.Errorf("msg AddLuckyWheelJoinAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
// }
// if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserJoin); err != nil {
// model.Log.Errorf("msg AddLuckyWheelJoinAsync SendLuckyWheel err:%v", err)
// }*/
// context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserJoin})
// sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
// MsgId: group_enum.GroupLuckyWheel,
// Content: string(context),
// }, true)
// return nil
// })
//
// //转盘超时回滚
// event.AddLuckyWheelTimeOutAsync(func(model *domain.Model, event *event.LuckyWheelTimeOutEvent) error {
// model.Log.Infof("groupMsg AddLuckyWheelTimeOutAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
// /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
// if err != nil {
// model.Log.Errorf("msg AddLuckyWheelTimeOutAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
// }
// if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserTimeOut); err != nil {
// model.Log.Errorf("msg AddLuckyWheelTimeOutAsync SendLuckyWheel err:%v", err)
// }*/
// context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserTimeOut})
// sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
// MsgId: group_enum.GroupLuckyWheel,
// Content: string(context),
// }, true)
// return nil
// })
//
// //幸运转盘user取消
// event.AddLuckyWheelCancelAsync(func(model *domain.Model, event *event.LuckyWheelCancelEvent) error {
// model.Log.Infof("msg AddLuckyWheelCancelAsync LuckyWheelId:%v, GroupUid:%v", event.LuckyWheelId, event.GroupUid)
// /* userIds, err := group_m.RoomLivingExistsUserId(event.GroupUid)
// if err != nil {
// model.Log.Errorf("msg AddLuckyWheelCancelAsync RoomLivingExistsUserId err:%v, groupUid:%v", err, event.GroupUid)
// }
// if err := consul.SendLuckyWheel(event.GroupUid, userIds, consul.LuckyWheelUserCancel); err != nil {
// model.Log.Errorf("msg AddLuckyWheelCancelAsync SendLuckyWheel err:%v", err)
// }*/
// context, _ := json.Marshal(map[string]interface{}{"type": consul.LuckyWheelUserCancel})
// sendSignalMsg(model, event.GroupUid, GroupSystemMsg{
// MsgId: group_enum.GroupLuckyWheel,
// Content: string(context),
// }, true)
// return nil
// })
//}