diff --git a/cron/cron.go b/cron/cron.go index 181ba0bbbcb84c2cf60f8a124a23578fdf57d7cf..76c11bc79950194ea1aa49c33afe2dfae53b184d 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -16,4 +16,5 @@ func Init() { mic_cron.OnMicCheck() // 检查上麦 group_cron.GroupPowerExpClear() // 清理家族经验/等级 group_cron.GroupPowerMonthRankAct() + group_cron.GroupInEventInit() // 进房事件 } diff --git a/cron/group_cron/group_in.go b/cron/group_cron/group_in.go new file mode 100644 index 0000000000000000000000000000000000000000..8581770f9097b7c106d5a04abd687e6b46969da9 --- /dev/null +++ b/cron/group_cron/group_in.go @@ -0,0 +1,34 @@ +package group_cron + +import ( + "git.hilo.cn/hilo-common/mycontext" + "git.hilo.cn/hilo-common/mylogrus" + "git.hilo.cn/hilo-common/resource/config" + "hilo-group/domain/service/event_s" + "time" +) + +// 进房事件 +func GroupInEventInit() { + if !config.IsMaster() { + return + } + mylogrus.MyLog.Infof("GroupInEventInit") + go func() { + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case <-ticker.C: + //start := time.Now() + myCtx := mycontext.CreateMyContext(nil) + // 消费进房事件 + if err := event_s.NewGroupInEventService(myCtx).Consume(); err != nil { + myCtx.Log.Errorf("groupInEvent consume fail:%v", err) + } else { + //myCtx.Log.Infof("groupInEvent consume success,cost:%v", time.Now().Sub(start)) + } + } + } + }() +} diff --git a/domain/event/group_ev/tx_group_in.go b/domain/event/group_ev/tx_group_in.go new file mode 100644 index 0000000000000000000000000000000000000000..602810dab9b517c128afdaf79f1218c215baf229 --- /dev/null +++ b/domain/event/group_ev/tx_group_in.go @@ -0,0 +1,32 @@ +package group_ev + +import ( + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/resource/mysql" +) + +var txGroupInListen = new(domain.EventBase) + +// 进房事件 +type TxGroupInEvent struct { + GroupId string // imGroupId + UserId mysql.ID + ExternalId string + Nick string + Avatar string + IsMember bool //是否永久成员 + IsVip bool + NobleLevel uint16 +} + +func AddTxGroupInEventSync(callback func(model *domain.Model, event interface{}) error) { + domain.AddEventSync(txGroupInListen, callback) +} + +func AddTxGroupInEventAsync(callback func(model *domain.Model, event interface{}) error) { + domain.AddEventAsync(txGroupInListen, callback) +} + +func PublishTxGroupInEvent(model *domain.Model, event interface{}) error { + return domain.PublishEvent(txGroupInListen, model, event) +} diff --git a/domain/model/event_m/group_in.go b/domain/model/event_m/group_in.go new file mode 100644 index 0000000000000000000000000000000000000000..fdf9fc66935f24963b584add2bc3cf31d4f45afc --- /dev/null +++ b/domain/model/event_m/group_in.go @@ -0,0 +1,64 @@ +package event_m + +import ( + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/resource/mysql" + "gorm.io/gorm" +) + +// 进房事件消息 +type EventGroupIn struct { + mysql.Entity + *domain.Model `gorm:"-"` + Proto mysql.Type + Payload []byte + Mark mysql.YesNo + MarkHiloGroup mysql.YesNo +} + +func (EventGroupIn) TableName() string { + return "event_group_in" +} + +// 偏移值 +type EventGroupInOffsetHiloGroup struct { + mysql.Entity + *domain.Model `gorm:"-"` + MarkOffset mysql.ID +} + +// 获取当前偏移值 +func GroupOffset(model *domain.Model) (*EventGroupInOffsetHiloGroup, error) { + offset := new(EventGroupInOffsetHiloGroup) + if err := model.Db.WithContext(model).First(offset).Error; err != nil { + if err != gorm.ErrRecordNotFound { + model.Log.Errorf("Offset fail:%v", err) + return nil, err + } + // gorm.ErrRecordNotFound + } + offset.Model = model + return offset, nil +} + +// 批量获取进房事件 +func FetchEventGroupIn(model *domain.Model, limit int) ([]*EventGroupIn, *EventGroupInOffsetHiloGroup, error) { + offset, err := GroupOffset(model) + if err != nil { + return nil, nil, err + } + var events []*EventGroupIn + if err := model.Db.WithContext(model).Model(EventGroupIn{}). + Where("id > ?", offset.MarkOffset). + Order("id asc").Limit(limit).Find(&events).Error; err != nil { + model.Log.Errorf("FetchEventGroupIn fail:%v", err) + return nil, nil, err + } + return events, offset, nil +} + +// 标记已完成 +func (p *EventGroupIn) MarkDone() error { + p.MarkHiloGroup = mysql.YES + return p.Db.WithContext(p.Model).Model(EventGroupIn{}).Where("id = ?", p.ID).Update("mark_hilo_group", p.MarkHiloGroup).Limit(1).Error +} diff --git a/domain/model/event_m/repo.go b/domain/model/event_m/repo.go index d4870364601c75073d61e36c31fea06ecd715075..83d8c9d1fe3ade75ce98b1517f68fc12ca70f2dc 100644 --- a/domain/model/event_m/repo.go +++ b/domain/model/event_m/repo.go @@ -9,3 +9,7 @@ func (p *EventGiftSendOffsetHiloGroup) Persistence() error { func (p *EventGiftSend) Persistence() error { return model.Persistent(p.Db, p) } + +func (p *EventGroupInOffsetHiloGroup) Persistence() error { + return model.Persistent(p.Db, p) +} \ No newline at end of file diff --git a/domain/model/group_m/mic.go b/domain/model/group_m/mic.go index b33508bd542b3f100c1c7a0f270c52b2498b3cb0..25e340317ec1cf76814e7b23b28f211a724c92bb 100644 --- a/domain/model/group_m/mic.go +++ b/domain/model/group_m/mic.go @@ -98,8 +98,8 @@ func GetMicNumType(model *domain.Model, groupUuid string) (group_e.GroupMicNumTy } } -//6个小时 -const expireMinute = 60 * 60 * 12 +// 3天 +const expireMinute = 60 * 60 * 24 * 3 const micExpire = expireMinute * time.Second //麦位基本信息。 diff --git a/domain/model/group_m/micData.go b/domain/model/group_m/micData.go index 7d74c352fee4f116fbbceaf6e0ca677b2220183e..546458dc1cc0b358b8ba5dc1b564c71f7f57120b 100644 --- a/domain/model/group_m/micData.go +++ b/domain/model/group_m/micData.go @@ -259,9 +259,9 @@ func MicAllRPush(model *domain.Model, groupUid string, externalId string) error } // - micContents, err := getMicAllContent(model, groupUid) + micContents, err := GetMicAllContent(model, groupUid) if err != nil { - model.Log.Errorf("MicChangeRPush MicAllRPush getMicAllContent err:%+v, micContents:%v groupUuid:%v, externalId:%v", err, micContents, groupUid, externalId) + model.Log.Errorf("MicChangeRPush MicAllRPush GetMicAllContent err:%+v, micContents:%v groupUuid:%v, externalId:%v", err, micContents, groupUid, externalId) return err } for _, micContent := range micContents { @@ -319,7 +319,7 @@ func MicRPush(model *domain.Model, txGroupId string, msg GroupSystemMsg) error { } //得使用旧的imGroupId -func getMicAllContent(model *domain.Model, groupUid string) ([]MicContent, error) { +func GetMicAllContent(model *domain.Model, groupUid string) ([]MicContent, error) { txGroupId, err := ToTxGroupId(model, groupUid) if err != nil { @@ -340,6 +340,9 @@ func getMicAllContent(model *domain.Model, groupUid string) ([]MicContent, error userIds := make([]uint64, 0, len(micUsers)) for _, r := range micUsers { userIds = append(userIds, r.UserId) + if r.CpUserId > 0 { + userIds = append(userIds, r.CpUserId) + } } model.Log.Infof("MicChangeRPush getMicAllContent groupUid:%v, userIds:%+v", groupUid, userIds) @@ -350,6 +353,13 @@ func getMicAllContent(model *domain.Model, groupUid string) ([]MicContent, error micContents := make([]MicContent, 0, len(mics)) for _, r := range mics { + var micEffect string + cpUserId := micUserMap[r.I].CpUserId + micUserData := micUserDataMap[micUserMap[r.I].UserId] + if cpUserId > 0 { + micEffect = "https://image.whoisamy.shop/hilo/resource/svga/mic_effect_cp.svga" + micUserData.MicEffect = micEffect + } micContents = append(micContents, MicContent{ GroupId: txGroupId, I: r.I, @@ -359,7 +369,7 @@ func getMicAllContent(model *domain.Model, groupUid string) ([]MicContent, error ExternalId: micUserMap[r.I].ExternalId, AgoraId: uint32(micUserMap[r.I].UserId), Timestamp: time.Now().UnixNano(), - User: micUserDataMap[micUserMap[r.I].UserId], + User: micUserData, }) } return micContents, nil @@ -448,6 +458,7 @@ func getMicUserDatas(model *domain.Model, userIds []uint64) (map[uint64]*MicUser return nil, err } svips, _ := rpc.MGetUserSvip(model, userIds) + cpRelations, _ := rpc.MGetUserCpRelations(model, userIds) micUserDataMap := map[uint64]*MicUserData{} for _, id := range userIds { @@ -461,11 +472,11 @@ func getMicUserDatas(model *domain.Model, userIds []uint64) (map[uint64]*MicUser } var headwearPicUrl string var headwearEffectUrl string - var reverseHeadwearEffectUrl string + var headwearReverseEffectUrl string if headwearUser, flag := headwearMap[id]; flag { headwearPicUrl = resHeadwearMap[headwearUser.HeadwearId].PicUrl headwearEffectUrl = resHeadwearMap[headwearUser.HeadwearId].EffectUrl - reverseHeadwearEffectUrl = resHeadwearMap[headwearUser.HeadwearId].ReverseEffectUrl + headwearReverseEffectUrl = resHeadwearMap[headwearUser.HeadwearId].ReverseEffectUrl } micUserDataMap[id] = &MicUserData{ Id: user.ID, @@ -478,9 +489,10 @@ func getMicUserDatas(model *domain.Model, userIds []uint64) (map[uint64]*MicUser NobleLeave: nobleMap[id], HeadwearPicUrl: headwearPicUrl, HeadwearEffectUrl: headwearEffectUrl, - HeadwearReverseEffectUrl: reverseHeadwearEffectUrl, + HeadwearReverseEffectUrl: headwearReverseEffectUrl, SvipLevel: svips[id].SvipLevel, Svip: rpc.CopySimpleSvip(svips[id]), + HeadwearIcon: cpRelations[user.ID].CpUserAvatar, } } return micUserDataMap, nil diff --git a/domain/service/event_s/event_init.go b/domain/service/event_s/event_init.go index 80447d9334dea82c7141d048b4dab4380ac2e8de..1361f260d890dd1d293b55dccd67931de6a99008 100644 --- a/domain/service/event_s/event_init.go +++ b/domain/service/event_s/event_init.go @@ -34,8 +34,9 @@ func EventInit() { GroupEvents() GroupImMass() GroupTheme() - SendGift() // 送礼事件 - OnMic() // 在麦上事件 + SendGift() // 送礼事件 + OnMic() // 在麦上事件 + GroupInMicChangeEvent() // 用户进房推送mic位置信息 } func GroupSupportEvents() { diff --git a/domain/service/event_s/group_in.go b/domain/service/event_s/group_in.go new file mode 100644 index 0000000000000000000000000000000000000000..ac4705ae6b52ba8ad055edd3e4366b3e791aba5d --- /dev/null +++ b/domain/service/event_s/group_in.go @@ -0,0 +1,83 @@ +package event_s + +import ( + "encoding/json" + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/mycontext" + "git.hilo.cn/hilo-common/resource/mysql" + "golang.org/x/sync/errgroup" + "hilo-group/domain/event/group_ev" + "hilo-group/domain/model/event_m" + "runtime/debug" +) + +type GroupInEventService struct { + svc *domain.Service +} + +func NewGroupInEventService(myContext *mycontext.MyContext) *GroupInEventService { + svc := domain.CreateService(myContext) + return &GroupInEventService{svc} +} + +// +func (s *GroupInEventService) Consume() error { + defer func() { + if err := recover(); err != nil { + s.svc.Log.Errorf("ExceptionHandle GroupInEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack())) + } + }() + var model = domain.CreateModel(s.svc.CtxAndDb) + events, offset, err := event_m.FetchEventGroupIn(model, BatchCount) + if err != nil { + return err + } + var wg errgroup.Group + for k := range events { + cpEvent := &event_m.EventGroupIn{ + Entity: mysql.Entity{ + ID: events[k].ID, + CreatedTime: events[k].CreatedTime, + UpdatedTime: events[k].UpdatedTime, + }, + Proto: events[k].Proto, + Payload: events[k].Payload, + Mark: events[k].Mark, + MarkHiloGroup: events[k].MarkHiloGroup, + } + wg.Go(func() error { + if cpEvent.MarkHiloGroup == mysql.YES { + model.Log.Warnf("already consume msg :%v", cpEvent) + return nil + } + groupInEvent := new(group_ev.TxGroupInEvent) + if err := json.Unmarshal(cpEvent.Payload, groupInEvent); err != nil { + model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) + return nil + } + // 发布事件 + if err := group_ev.PublishTxGroupInEvent(model, groupInEvent); err != nil { + model.Log.Errorf("PublishTxGroupInEvent,event:%v,err:%v", cpEvent, err) + return err + } + // 标记已经处理 + cpEvent.Model = model + err = cpEvent.MarkDone() + if err != nil { + model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) + } + return err + }) + } + err = wg.Wait() + if err != nil { + model.Log.Errorf("batch consume msg has fail,event,err:%v", err) + // 暂时先允许丢数据,继续mark offset + } + // 最后一次提交offset + if len(events) > 0 { + offset.MarkOffset = events[len(events)-1].ID + return offset.Persistence() + } + return nil +} diff --git a/domain/service/event_s/mic_change.go b/domain/service/event_s/mic_change.go new file mode 100644 index 0000000000000000000000000000000000000000..30b473f00362019cb06ad83a95f1ea78ac791d71 --- /dev/null +++ b/domain/service/event_s/mic_change.go @@ -0,0 +1,94 @@ +package event_s + +import ( + "encoding/json" + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/protocol/userProxy" + "git.hilo.cn/hilo-common/rpc" + uuid "github.com/satori/go.uuid" + "hilo-group/domain/event/group_ev" + "hilo-group/domain/model/group_m" + "hilo-group/myerr/bizerr" +) + +// 用户进房后推送mic信息 +func GroupInMicChangeEvent() { + group_ev.AddTxGroupInEventAsync(func(model *domain.Model, event interface{}) error { + e, ok := event.(*group_ev.TxGroupInEvent) + if !ok { + return bizerr.InvalidParameter + } + return MicAllRPush(model, e.GroupId, e.UserId, e.ExternalId) + }) +} + +func MicAllRPush(model *domain.Model, imGroupId string, userId uint64, externalId string) error { + model.Log.Infof("MicChangeRPush MicAllRPush begin groupUuid:%v, externalId:%v", imGroupId, externalId) + + txGroupId, err := group_m.ToTxGroupId(model, imGroupId) + if err != nil { + model.Log.Errorf("ToImGroupId fail:%v", err) + return err + } + // + micContents, err := group_m.GetMicAllContent(model, imGroupId) + if err != nil { + model.Log.Errorf("MicChangeRPush MicAllRPush getMicAllContent err:%+v, micContents:%v groupUuid:%v, externalId:%v", err, micContents, txGroupId, externalId) + return err + } + for _, micContent := range micContents { + //麦上是默认值,就不用推 + if micContent.Forbid == false && micContent.User == nil && micContent.AgoraId == 0 && micContent.Lock == false && micContent.ExternalId == "" && micContent.MicForbid == false { + model.Log.Infof("MicChangeRPush MicAllRPush default micContent:%v, groupUuid:%v, externalId:%v, micContent:%+v", micContent, txGroupId, externalId, micContent) + continue + } + micContentStr, err := json.Marshal(micContent) + if err != nil { + continue + } + var micExternalId string + var micUserData *userProxy.MicUserData + if micContent.User != nil { + micExternalId = micContent.ExternalId + micUserData = &userProxy.MicUserData{ + Id: micContent.User.Id, + ExternalId: micContent.User.ExternalId, + Avatar: micContent.User.Avatar, + Nick: micContent.User.Nick, + Sex: uint32(micContent.User.Sex), + Code: micContent.User.Code, + IsVip: micContent.User.IsVip, + Noble: uint32(micContent.User.NobleLeave), + HeadwearPicUrl: micContent.User.HeadwearPicUrl, + HeadwearEffectUrl: micContent.User.HeadwearEffectUrl, + HeadwearReverseEffectUrl: micContent.User.HeadwearReverseEffectUrl, + SvipLevel: uint32(micContent.User.SvipLevel), + MicEffect: micContent.User.MicEffect, + HeadwearIcon: micContent.User.HeadwearIcon, + Svip: nil, + } + if micContent.User.Svip.SvipLevel > 0 || len(micContent.User.Svip.Privileges) > 0 { + micUserData.Svip = &userProxy.Svip{ + SvipLevel: uint64(micContent.User.SvipLevel), + } + for _, v := range micContent.User.Svip.Privileges { + micUserData.Svip.Privileges = append(micUserData.Svip.Privileges, &userProxy.SvipPrivilege{ + Type: int32(v.Type), + CanSwitch: v.CanSwitch, + UserSwitch: v.UserSwitch, + MysteryCode: v.MysteryCode, + }) + } + } + } + seqId := uuid.NewV4().String() + + if err := rpc.SendSocketMicChange(seqId, userId, micExternalId, txGroupId, uint32(micContent.I), micContent.Lock, micContent.Forbid, micContent.MicForbid, micContent.AgoraId, micUserData); err != nil { + model.Log.Errorf("MicAllRPush err:%+v, groupUuid:%v, externalId:%v, micContent:%+v", err, txGroupId, externalId, string(micContentStr)) + } else { + model.Log.Infof("MicChangeRPush MicAllRPush Marshal micContent, groupUuid:%v, externalId:%v, micContent:%+v", txGroupId, externalId, string(micContentStr)) + } + } + model.Log.Infof("MicAllRPush end groupUuid:%v, externalId:%v", txGroupId, externalId) + return nil +} diff --git a/go.mod b/go.mod index b08ac132bbdfa140b4e43eddf74a718643ae49f7..0224f6bac35923055c2c6c661ab3267527ccc14c 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( github.com/tencentyun/tls-sig-api-v2-golang v1.0.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect + golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect golang.org/x/text v0.3.6 // indirect golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect diff --git a/go.sum b/go.sum index 589d196488fc37938fe91bc5857e004fb2acc526..cdde2bb016e63bf18760c146739d03b81d2b327b 100644 --- a/go.sum +++ b/go.sum @@ -401,6 +401,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/route/group_r/group_mic.go b/route/group_r/group_mic.go index c85a3002d55ec13876905411c02101baa4b3f4e6..8484b949e7ce5988fa0e166772f32dc5b5d09631 100644 --- a/route/group_r/group_mic.go +++ b/route/group_r/group_mic.go @@ -550,12 +550,13 @@ func GroupMicIn(c *gin.Context) (*mycontext.MyContext, error) { return myContext, err } if micUser != nil { - if iStr == "" { + // 同房间且上麦序列为空 + if iStr == "" && micUser.GroupUuid == groupUuid { // 非切麦操作 return myContext, bizerr.GroupMicUserHasIn } // 切换麦位,先下麦 - if err := group_mic_s.NewGroupPowerService(myContext).GroupMicLeave(groupUuid, micUser.I, userId, externalId); err != nil { + if err := group_mic_s.NewGroupPowerService(myContext).GroupMicLeave(micUser.GroupUuid, micUser.I, userId, externalId); err != nil { return myContext, err } }