diff --git a/_const/enum/gift_e/enum.go b/_const/enum/gift_e/enum.go new file mode 100644 index 0000000000000000000000000000000000000000..0a7aeb8bcce5b69bcc492766a8182d13cd502831 --- /dev/null +++ b/_const/enum/gift_e/enum.go @@ -0,0 +1,60 @@ +package gift_e + +import "git.hilo.cn/hilo-common/resource/mysql" + +type GiftOperateSceneType mysql.Type + +const ( + //匹配声网中场景 + MatchVedioSceneType GiftOperateSceneType = 1 + // 私聊 + PriveChatSceneType GiftOperateSceneType = 2 + //1对1视频 + VideoSceneType GiftOperateSceneType = 3 + //群组 + GroupSceneType GiftOperateSceneType = 4 +) + +type ResGiftAvatarType = mysql.Type + +const ( + SendGiftCpGiftAvatarType ResGiftAvatarType = 1 //周CP + MonthlyWealthGiftAvatarType ResGiftAvatarType = 2 //月冠财富榜 + MonthlyCharmGiftAvatarType ResGiftAvatarType = 3 //月冠魅力榜 + MonthlyPayGiftAvatarType ResGiftAvatarType = 4 //月冠充值榜 + WeekStarGiftAvatarType ResGiftAvatarType = 5 //周星榜 + CountryStarGiftAvatarType ResGiftAvatarType = 6 // 国家之星 +) + +type GiftPrivateRecordType = mysql.Type + +const ( + PrivateRecord GiftPrivateRecordType = 1 + VideoTradeUnion GiftPrivateRecordType = 2 + PrivateRecordBag GiftPrivateRecordType = 3 +) + +type GiftColumnType = uint16 + +const ( + GiftColumnGift GiftColumnType = 1 // 礼物 + GiftColumnRomance GiftColumnType = 2 // 浪漫 + GiftColumnCountry GiftColumnType = 3 // 国家 + GiftColumnCustom GiftColumnType = 4 // 定制 +) + +type GiftTagType = uint16 + +const ( + GiftTagMedal GiftTagType = 1 // 勋章礼物 + GiftTagWeeklyStar GiftTagType = 2 // 周星礼物 +) + +type GiftEntryType = uint16 + +const ( + GiftEntryWeeklyStar GiftEntryType = 1 // 周星活动入口 + GiftEntryWeeklyCp GiftEntryType = 2 // 周CP活动入口 + GiftEntryMedal GiftEntryType = 3 // 勋章激活动入口 + GiftEntryCountryStar GiftEntryType = 4 // 国家之星活动入口 +) diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 0000000000000000000000000000000000000000..68198344fd177f999a7f256be51059c1fc7d84e8 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,14 @@ +package cron + +import ( + "git.hilo.cn/hilo-common/resource/config" + "hilo-user/cron/gift_cron" +) + +func Init() { + if !config.IsMaster() { + return + } + gift_cron.SendGiftEventInit() // 礼物消息 + gift_cron.GiftRemark() // 礼物消息补偿 +} diff --git a/cron/gift_cron/remark.go b/cron/gift_cron/remark.go new file mode 100644 index 0000000000000000000000000000000000000000..8f692b8050c80900697afa5d4ffbfe3096af3eda --- /dev/null +++ b/cron/gift_cron/remark.go @@ -0,0 +1,47 @@ +package gift_cron + +import ( + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/resource/config" + "github.com/robfig/cron" + "hilo-user/domain/model/event_m" +) + +// 补偿检查mark=2 +func GiftRemark() { + c := cron.New() + // 每5分钟 + spec := "0 */5 * * * ?" + if !config.AppIsRelease() { + spec = "0 * * * * ?" // 测服1分钟 + } + _ = c.AddFunc(spec, func() { + var model = domain.CreateModelNil() + unmarks, err := event_m.FetchUnMarkEvents(model, 10) + if err != nil { + model.Log.Errorf("FetchUnMarkEvents fail:%v", err) + return + } + for _, unmark := range unmarks { + model.Log.Infof("FetchUnMarkEvent start:%v", unmark.ID) + // 重新入库 + if err := event_m.AddUnMarkEvent(model, &event_m.EventGiftSend{ + Proto: unmark.Proto, + Payload: unmark.Payload, + MarkHiloGroup: unmark.MarkHiloGroup, + MarkHiloUser: unmark.MarkHiloUser, + Mark: unmark.Mark, // 不能影响别的服务的mark状态 + }); err != nil { + model.Log.Errorf("FetchUnMarkEvent add unmark fail:%v", err) + continue + } + // 旧的标记已处理 + unmark.Model = model + if err := unmark.MarkDone(); err != nil { + model.Log.Errorf("FetchUnMarkEvent mark fail:%v", err) + } + model.Log.Infof("FetchUnMarkEvent success:%v", unmark.ID) + } + }) + c.Start() +} diff --git a/cron/gift_cron/send_gift.go b/cron/gift_cron/send_gift.go new file mode 100644 index 0000000000000000000000000000000000000000..05608513b28a32a3d213a245a959ff5a7ff84c31 --- /dev/null +++ b/cron/gift_cron/send_gift.go @@ -0,0 +1,31 @@ +package gift_cron + +import ( + "git.hilo.cn/hilo-common/mycontext" + "git.hilo.cn/hilo-common/mylogrus" + "hilo-user/domain/service/event_s" + "time" +) + +// 送礼事件 +func SendGiftEventInit() { + + mylogrus.MyLog.Infof("SendGiftEventInit") + go func() { + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case <-ticker.C: + //start := time.Now() + myCtx := mycontext.CreateMyContext(nil) + // 消费送礼事件 + if err := event_s.NewGiftSendEventService(myCtx).Consume(); err != nil { + myCtx.Log.Errorf("eventServcie consume fail:%v", err) + } else { + //myCtx.Log.Infof("eventServcie consume success,cost:%v", time.Now().Sub(start)) + } + } + } + }() +} diff --git a/domain/event/gift_ev/send_gift.go b/domain/event/gift_ev/send_gift.go new file mode 100644 index 0000000000000000000000000000000000000000..bae70bdb54cd4af1ed22c94acc7d92ee551daf05 --- /dev/null +++ b/domain/event/gift_ev/send_gift.go @@ -0,0 +1,52 @@ +package gift_ev + +import ( + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/resource/mysql" + "hilo-user/_const/enum/gift_e" +) + +var sendGiftListen = new(domain.EventBase) + +// 送礼事件 +type SendGiftEvent struct { + SendUserId mysql.ID + ReceiveUserIds []mysql.ID + ResGift EventResGift + GiftOperateIds []mysql.ID + GiftN mysql.Num + SceneType gift_e.GiftOperateSceneType + SceneUid mysql.Str + NoDiamondConsume bool // 不要消费钻石 + TotalConsume uint64 // 房间的总消费额 +} + +type EventResGift struct { + ID mysql.ID + Name mysql.Str + IconUrl mysql.Str + SvgaUrl mysql.Str + MusicUrl mysql.Str + DiamondNum mysql.Num + BeanNum mysql.Num + ReceiveDiamondNum mysql.Num + Second mysql.Num + N mysql.Num + GroupBroadcast bool + Cp bool + Together bool + Status mysql.UserYesNo + GiftType mysql.Type +} + +func AddSendGiftEventSync(callback func(model *domain.Model, event interface{}) error) { + domain.AddEventSync(sendGiftListen, callback) +} + +func AddSendGiftEventAsync(callback func(model *domain.Model, event interface{}) error) { + domain.AddEventAsync(sendGiftListen, callback) +} + +func PublishSendGiftEvent(model *domain.Model, event interface{}) error { + return domain.PublishEvent(sendGiftListen, model, event) +} diff --git a/domain/model/event_m/repo.go b/domain/model/event_m/repo.go new file mode 100644 index 0000000000000000000000000000000000000000..3e2fd70aec00dab750a714c240a48cffd32fe0c3 --- /dev/null +++ b/domain/model/event_m/repo.go @@ -0,0 +1,11 @@ +package event_m + +import "hilo-user/domain/model" + +func (p *EventGiftSendOffsetHiloUser) Persistence() error { + return model.Persistent(p.Db, p) +} + +func (p *EventGiftSend) Persistence() error { + return model.Persistent(p.Db, p) +} diff --git a/domain/model/event_m/send_gift.go b/domain/model/event_m/send_gift.go new file mode 100644 index 0000000000000000000000000000000000000000..19318d1dfae2a756f8b675996945893378dbcbf0 --- /dev/null +++ b/domain/model/event_m/send_gift.go @@ -0,0 +1,103 @@ +package event_m + +import ( + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/resource/mysql" + "github.com/pkg/errors" + "gorm.io/gorm" + "time" +) + +// 送礼事件消息 +type EventGiftSend struct { + mysql.Entity + *domain.Model `gorm:"-"` + Proto mysql.Type + Payload []byte + Mark mysql.YesNo + MarkHiloGroup mysql.YesNo + MarkHiloUser mysql.YesNo +} + +func (EventGiftSend) TableName() string { + return "event_gift_send" +} + +// 偏移值 +type EventGiftSendOffsetHiloUser struct { + mysql.Entity + *domain.Model `gorm:"-"` + MarkOffset mysql.ID +} + +// 获取当前偏移值 +func Offset(model *domain.Model) (*EventGiftSendOffsetHiloUser, error) { + offset := new(EventGiftSendOffsetHiloUser) + if err := model.Db.WithContext(model).First(offset).Error; err != nil { + if err != gorm.ErrRecordNotFound { + model.Log.Errorf("Offset fail:%v", err) + return nil, err + } + // gorm.ErrRecordNotFound + } + offset.Model = model + return offset, nil +} + +// 批量获取送礼 +func FetchEventGiftSend(model *domain.Model, limit int) ([]*EventGiftSend, *EventGiftSendOffsetHiloUser, error) { + offset, err := Offset(model) + if err != nil { + return nil, nil, err + } + var events []*EventGiftSend + if err := model.Db.WithContext(model).Model(EventGiftSend{}). + Where("id > ?", offset.MarkOffset). + Order("id asc").Limit(limit).Find(&events).Error; err != nil { + model.Log.Errorf("FetchEventGiftSend fail:%v", err) + return nil, nil, err + } + return events, offset, nil +} + +// 标记已完成 +func (p *EventGiftSend) MarkDone() error { + p.MarkHiloUser = mysql.YES + result := p.Db.WithContext(p.Model).Model(EventGiftSend{}).Where("id = ?", p.ID).Update("mark_hilo_user", p.MarkHiloUser).Limit(1) + if result.Error != nil { + return result.Error + } + if result.RowsAffected <= 0 { + p.Model.Log.Errorf("MarkDone row affeced 0") + return errors.New("row affect 0") + } + return nil +} + +// 查询过去1小时-5分钟前未mark的事件 +// 用来补偿 +// 记得加limit +func FetchUnMarkEvents(model *domain.Model, limit int) ([]*EventGiftSend, error) { + offset, err := Offset(model) + if err != nil { + return nil, err + } + t := time.Now().Add(-time.Minute * 5) + lt := t.Add(-time.Hour) + var events []*EventGiftSend + if err := model.Db.WithContext(model).Model(EventGiftSend{}). + Where("mark_hilo_user = ?", mysql.NO). + Where("id <= ?", offset.MarkOffset). + Where("created_time < ?", t). + Where("created_time > ?", lt). + Order("id asc").Limit(limit).Find(&events).Error; err != nil { + model.Log.Errorf("FetchUnMarkEvents fail:%v", err) + return nil, err + } + return events, nil +} + +// 补偿加上unmark的event +func AddUnMarkEvent(model *domain.Model, event *EventGiftSend) error { + return model.Db.WithContext(model).Create(event).Error +} diff --git a/domain/service/event_s/send_gift.go b/domain/service/event_s/send_gift.go new file mode 100644 index 0000000000000000000000000000000000000000..d740c976f69c3ccf02be5d68d2e108aa8f96697e --- /dev/null +++ b/domain/service/event_s/send_gift.go @@ -0,0 +1,77 @@ +package event_s + +import ( + "encoding/json" + "git.hilo.cn/hilo-common/domain" + "git.hilo.cn/hilo-common/mycontext" + "git.hilo.cn/hilo-common/resource/mysql" + "hilo-user/domain/event/gift_ev" + "hilo-user/domain/model/event_m" + "runtime/debug" +) + +// 每次处理500条 +const BatchCount = 500 + +type GiftSendEventService struct { + svc *domain.Service +} + +func NewGiftSendEventService(myContext *mycontext.MyContext) *GiftSendEventService { + svc := domain.CreateService(myContext) + return &GiftSendEventService{svc} +} + +// +func (s *GiftSendEventService) Consume() error { + defer func() { + if err := recover(); err != nil { + s.svc.Log.Errorf("ExceptionHandle GiftSendEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack())) + } + }() + var model = domain.CreateModel(s.svc.CtxAndDb) + return model.Transaction(func(model *domain.Model) error { + events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) + if err != nil { + return err + } + for k := range events { + cpEvent := &event_m.EventGiftSend{ + Entity: mysql.Entity{ + ID: events[k].ID, + CreatedTime: events[k].CreatedTime, + UpdatedTime: events[k].UpdatedTime, + }, + Proto: events[k].Proto, + Payload: events[k].Payload, + MarkHiloUser: events[k].MarkHiloUser, + } + if cpEvent.MarkHiloUser == mysql.YES { + model.Log.Warnf("already consume msg :%v", cpEvent) + continue + } + sendGiftEvent := new(gift_ev.SendGiftEvent) + if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil { + model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) + continue + } + // 标记已经处理,mark比publish事件提前,尽量避免异步事件重复执行 + cpEvent.Model = model + err = cpEvent.MarkDone() + if err != nil { + model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) + return err + } + if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil { + model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", string(cpEvent.Payload), err) + return err + } + } + // 最后一次提交offset + if len(events) > 0 { + offset.MarkOffset = events[len(events)-1].ID + return offset.Persistence() + } + return nil + }) +} diff --git a/go.mod b/go.mod index a74969e3efc833de5ae0ba250a2497fdf1cbb793..1c2ca0b2ab09661a3d183e77d8d88a8d5d4fe889 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect diff --git a/go.sum b/go.sum index cb456a754920b732a74bed3a266b5f60e457645e..1c180d30f95fe2cc498edf367e70f9df5d60e1cc 100644 --- a/go.sum +++ b/go.sum @@ -202,6 +202,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= diff --git a/main.go b/main.go index 073ab09f6e454e270e7b804c84ca41c605ef9cea..2c2a8983c209103e19b2fd8f62fc3611a6ddc059 100755 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "git.hilo.cn/hilo-common/resource/consul" + "hilo-user/cron" "hilo-user/domain/service/event_s" "hilo-user/route" ) @@ -14,7 +15,7 @@ const ( ) func main() { - //cron.Init() // 开启定时任务 + cron.Init() // 开启定时任务 event_s.EventInit() // 注册事件(内部事件+mysql拟kafka) r := route.InitRouter() // 注册路由 consul.RegisterToConsul(PORT, RegisterName, RegisterTag) // 服务注册