package event_s import ( "encoding/json" "git.hilo.cn/hilo-common/domain" "git.hilo.cn/hilo-common/mycontext" "git.hilo.cn/hilo-common/resource/mysql" "hilo-group/domain/event/gift_ev" "hilo-group/domain/model/event_m" "runtime/debug" ) // 每次处理500条 const BatchCount = 500 type GiftSendEventService struct { svc *domain.Service } func NewGiftSendEventService(myContext *mycontext.MyContext) *GiftSendEventService { svc := domain.CreateService(myContext) return &GiftSendEventService{svc} } // func (s *GiftSendEventService) Consume() error { defer func() { if err := recover(); err != nil { s.svc.Log.Errorf("ExceptionHandle GiftSendEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack())) } }() var model = domain.CreateModel(s.svc.CtxAndDb) return model.Transaction(func(model *domain.Model) error { events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) if err != nil { return err } for k := range events { cpEvent := &event_m.EventGiftSend{ Entity: mysql.Entity{ ID: events[k].ID, CreatedTime: events[k].CreatedTime, UpdatedTime: events[k].UpdatedTime, }, Proto: events[k].Proto, Payload: events[k].Payload, MarkHiloGroup: events[k].MarkHiloGroup, } if cpEvent.MarkHiloGroup == mysql.YES { model.Log.Warnf("already consume msg :%v", cpEvent) continue } sendGiftEvent := new(gift_ev.SendGiftEvent) if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil { model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) continue } // 标记已经处理,mark比publish事件提前,尽量避免异步事件重复执行 cpEvent.Model = model err = cpEvent.MarkDone() if err != nil { model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) return err } if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil { model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", string(cpEvent.Payload), err) return err } } // 最后一次提交offset if len(events) > 0 { offset.MarkOffset = events[len(events)-1].ID return offset.Persistence() } return nil }) }