Commit 1864cc93 authored by hujiebin's avatar hujiebin

开启定时任务

parent 718e349d
package gift_e
import "git.hilo.cn/hilo-common/resource/mysql"
type GiftOperateSceneType mysql.Type
const (
//匹配声网中场景
MatchVedioSceneType GiftOperateSceneType = 1
// 私聊
PriveChatSceneType GiftOperateSceneType = 2
//1对1视频
VideoSceneType GiftOperateSceneType = 3
//群组
GroupSceneType GiftOperateSceneType = 4
)
type ResGiftAvatarType = mysql.Type
const (
SendGiftCpGiftAvatarType ResGiftAvatarType = 1 //周CP
MonthlyWealthGiftAvatarType ResGiftAvatarType = 2 //月冠财富榜
MonthlyCharmGiftAvatarType ResGiftAvatarType = 3 //月冠魅力榜
MonthlyPayGiftAvatarType ResGiftAvatarType = 4 //月冠充值榜
WeekStarGiftAvatarType ResGiftAvatarType = 5 //周星榜
CountryStarGiftAvatarType ResGiftAvatarType = 6 // 国家之星
)
type GiftPrivateRecordType = mysql.Type
const (
PrivateRecord GiftPrivateRecordType = 1
VideoTradeUnion GiftPrivateRecordType = 2
PrivateRecordBag GiftPrivateRecordType = 3
)
type GiftColumnType = uint16
const (
GiftColumnGift GiftColumnType = 1 // 礼物
GiftColumnRomance GiftColumnType = 2 // 浪漫
GiftColumnCountry GiftColumnType = 3 // 国家
GiftColumnCustom GiftColumnType = 4 // 定制
)
type GiftTagType = uint16
const (
GiftTagMedal GiftTagType = 1 // 勋章礼物
GiftTagWeeklyStar GiftTagType = 2 // 周星礼物
)
type GiftEntryType = uint16
const (
GiftEntryWeeklyStar GiftEntryType = 1 // 周星活动入口
GiftEntryWeeklyCp GiftEntryType = 2 // 周CP活动入口
GiftEntryMedal GiftEntryType = 3 // 勋章激活动入口
GiftEntryCountryStar GiftEntryType = 4 // 国家之星活动入口
)
package cron
import (
"git.hilo.cn/hilo-common/resource/config"
"hilo-user/cron/gift_cron"
)
func Init() {
if !config.IsMaster() {
return
}
gift_cron.SendGiftEventInit() // 礼物消息
gift_cron.GiftRemark() // 礼物消息补偿
}
package gift_cron
import (
"git.hilo.cn/hilo-common/domain"
"git.hilo.cn/hilo-common/resource/config"
"github.com/robfig/cron"
"hilo-user/domain/model/event_m"
)
// 补偿检查mark=2
func GiftRemark() {
c := cron.New()
// 每5分钟
spec := "0 */5 * * * ?"
if !config.AppIsRelease() {
spec = "0 * * * * ?" // 测服1分钟
}
_ = c.AddFunc(spec, func() {
var model = domain.CreateModelNil()
unmarks, err := event_m.FetchUnMarkEvents(model, 10)
if err != nil {
model.Log.Errorf("FetchUnMarkEvents fail:%v", err)
return
}
for _, unmark := range unmarks {
model.Log.Infof("FetchUnMarkEvent start:%v", unmark.ID)
// 重新入库
if err := event_m.AddUnMarkEvent(model, &event_m.EventGiftSend{
Proto: unmark.Proto,
Payload: unmark.Payload,
MarkHiloGroup: unmark.MarkHiloGroup,
MarkHiloUser: unmark.MarkHiloUser,
Mark: unmark.Mark, // 不能影响别的服务的mark状态
}); err != nil {
model.Log.Errorf("FetchUnMarkEvent add unmark fail:%v", err)
continue
}
// 旧的标记已处理
unmark.Model = model
if err := unmark.MarkDone(); err != nil {
model.Log.Errorf("FetchUnMarkEvent mark fail:%v", err)
}
model.Log.Infof("FetchUnMarkEvent success:%v", unmark.ID)
}
})
c.Start()
}
package gift_cron
import (
"git.hilo.cn/hilo-common/mycontext"
"git.hilo.cn/hilo-common/mylogrus"
"hilo-user/domain/service/event_s"
"time"
)
// 送礼事件
func SendGiftEventInit() {
mylogrus.MyLog.Infof("SendGiftEventInit")
go func() {
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
for {
select {
case <-ticker.C:
//start := time.Now()
myCtx := mycontext.CreateMyContext(nil)
// 消费送礼事件
if err := event_s.NewGiftSendEventService(myCtx).Consume(); err != nil {
myCtx.Log.Errorf("eventServcie consume fail:%v", err)
} else {
//myCtx.Log.Infof("eventServcie consume success,cost:%v", time.Now().Sub(start))
}
}
}
}()
}
package gift_ev
import (
"git.hilo.cn/hilo-common/domain"
"git.hilo.cn/hilo-common/resource/mysql"
"hilo-user/_const/enum/gift_e"
)
var sendGiftListen = new(domain.EventBase)
// 送礼事件
type SendGiftEvent struct {
SendUserId mysql.ID
ReceiveUserIds []mysql.ID
ResGift EventResGift
GiftOperateIds []mysql.ID
GiftN mysql.Num
SceneType gift_e.GiftOperateSceneType
SceneUid mysql.Str
NoDiamondConsume bool // 不要消费钻石
TotalConsume uint64 // 房间的总消费额
}
type EventResGift struct {
ID mysql.ID
Name mysql.Str
IconUrl mysql.Str
SvgaUrl mysql.Str
MusicUrl mysql.Str
DiamondNum mysql.Num
BeanNum mysql.Num
ReceiveDiamondNum mysql.Num
Second mysql.Num
N mysql.Num
GroupBroadcast bool
Cp bool
Together bool
Status mysql.UserYesNo
GiftType mysql.Type
}
func AddSendGiftEventSync(callback func(model *domain.Model, event interface{}) error) {
domain.AddEventSync(sendGiftListen, callback)
}
func AddSendGiftEventAsync(callback func(model *domain.Model, event interface{}) error) {
domain.AddEventAsync(sendGiftListen, callback)
}
func PublishSendGiftEvent(model *domain.Model, event interface{}) error {
return domain.PublishEvent(sendGiftListen, model, event)
}
package event_m
import "hilo-user/domain/model"
func (p *EventGiftSendOffsetHiloUser) Persistence() error {
return model.Persistent(p.Db, p)
}
func (p *EventGiftSend) Persistence() error {
return model.Persistent(p.Db, p)
}
package event_m
import (
"git.hilo.cn/hilo-common/domain"
"git.hilo.cn/hilo-common/resource/mysql"
"github.com/pkg/errors"
"gorm.io/gorm"
"time"
)
// 送礼事件消息
type EventGiftSend struct {
mysql.Entity
*domain.Model `gorm:"-"`
Proto mysql.Type
Payload []byte
Mark mysql.YesNo
MarkHiloGroup mysql.YesNo
MarkHiloUser mysql.YesNo
}
func (EventGiftSend) TableName() string {
return "event_gift_send"
}
// 偏移值
type EventGiftSendOffsetHiloUser struct {
mysql.Entity
*domain.Model `gorm:"-"`
MarkOffset mysql.ID
}
// 获取当前偏移值
func Offset(model *domain.Model) (*EventGiftSendOffsetHiloUser, error) {
offset := new(EventGiftSendOffsetHiloUser)
if err := model.Db.WithContext(model).First(offset).Error; err != nil {
if err != gorm.ErrRecordNotFound {
model.Log.Errorf("Offset fail:%v", err)
return nil, err
}
// gorm.ErrRecordNotFound
}
offset.Model = model
return offset, nil
}
// 批量获取送礼
func FetchEventGiftSend(model *domain.Model, limit int) ([]*EventGiftSend, *EventGiftSendOffsetHiloUser, error) {
offset, err := Offset(model)
if err != nil {
return nil, nil, err
}
var events []*EventGiftSend
if err := model.Db.WithContext(model).Model(EventGiftSend{}).
Where("id > ?", offset.MarkOffset).
Order("id asc").Limit(limit).Find(&events).Error; err != nil {
model.Log.Errorf("FetchEventGiftSend fail:%v", err)
return nil, nil, err
}
return events, offset, nil
}
// 标记已完成
func (p *EventGiftSend) MarkDone() error {
p.MarkHiloUser = mysql.YES
result := p.Db.WithContext(p.Model).Model(EventGiftSend{}).Where("id = ?", p.ID).Update("mark_hilo_user", p.MarkHiloUser).Limit(1)
if result.Error != nil {
return result.Error
}
if result.RowsAffected <= 0 {
p.Model.Log.Errorf("MarkDone row affeced 0")
return errors.New("row affect 0")
}
return nil
}
// 查询过去1小时-5分钟前未mark的事件
// 用来补偿
// 记得加limit
func FetchUnMarkEvents(model *domain.Model, limit int) ([]*EventGiftSend, error) {
offset, err := Offset(model)
if err != nil {
return nil, err
}
t := time.Now().Add(-time.Minute * 5)
lt := t.Add(-time.Hour)
var events []*EventGiftSend
if err := model.Db.WithContext(model).Model(EventGiftSend{}).
Where("mark_hilo_user = ?", mysql.NO).
Where("id <= ?", offset.MarkOffset).
Where("created_time < ?", t).
Where("created_time > ?", lt).
Order("id asc").Limit(limit).Find(&events).Error; err != nil {
model.Log.Errorf("FetchUnMarkEvents fail:%v", err)
return nil, err
}
return events, nil
}
// 补偿加上unmark的event
func AddUnMarkEvent(model *domain.Model, event *EventGiftSend) error {
return model.Db.WithContext(model).Create(event).Error
}
package event_s
import (
"encoding/json"
"git.hilo.cn/hilo-common/domain"
"git.hilo.cn/hilo-common/mycontext"
"git.hilo.cn/hilo-common/resource/mysql"
"hilo-user/domain/event/gift_ev"
"hilo-user/domain/model/event_m"
"runtime/debug"
)
// 每次处理500条
const BatchCount = 500
type GiftSendEventService struct {
svc *domain.Service
}
func NewGiftSendEventService(myContext *mycontext.MyContext) *GiftSendEventService {
svc := domain.CreateService(myContext)
return &GiftSendEventService{svc}
}
//
func (s *GiftSendEventService) Consume() error {
defer func() {
if err := recover(); err != nil {
s.svc.Log.Errorf("ExceptionHandle GiftSendEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack()))
}
}()
var model = domain.CreateModel(s.svc.CtxAndDb)
return model.Transaction(func(model *domain.Model) error {
events, offset, err := event_m.FetchEventGiftSend(model, BatchCount)
if err != nil {
return err
}
for k := range events {
cpEvent := &event_m.EventGiftSend{
Entity: mysql.Entity{
ID: events[k].ID,
CreatedTime: events[k].CreatedTime,
UpdatedTime: events[k].UpdatedTime,
},
Proto: events[k].Proto,
Payload: events[k].Payload,
MarkHiloUser: events[k].MarkHiloUser,
}
if cpEvent.MarkHiloUser == mysql.YES {
model.Log.Warnf("already consume msg :%v", cpEvent)
continue
}
sendGiftEvent := new(gift_ev.SendGiftEvent)
if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil {
model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
continue
}
// 标记已经处理,mark比publish事件提前,尽量避免异步事件重复执行
cpEvent.Model = model
err = cpEvent.MarkDone()
if err != nil {
model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err)
return err
}
if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil {
model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", string(cpEvent.Payload), err)
return err
}
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
})
}
......@@ -64,6 +64,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
......
......@@ -202,6 +202,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
......
......@@ -3,6 +3,7 @@ package main
import (
"fmt"
"git.hilo.cn/hilo-common/resource/consul"
"hilo-user/cron"
"hilo-user/domain/service/event_s"
"hilo-user/route"
)
......@@ -14,7 +15,7 @@ const (
)
func main() {
//cron.Init() // 开启定时任务
cron.Init() // 开启定时任务
event_s.EventInit() // 注册事件(内部事件+mysql拟kafka)
r := route.InitRouter() // 注册路由
consul.RegisterToConsul(PORT, RegisterName, RegisterTag) // 服务注册
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment