Commit 5b736484 authored by hujiebin's avatar hujiebin

feat:同步所有事件行为,对batchCount事务

parent 62315818
......@@ -30,13 +30,12 @@ func (s *GiftSendEventService) Consume() error {
}
}()
var model = domain.CreateModel(s.svc.CtxAndDb)
events, offset, err := event_m.FetchEventGiftSend(model, BatchCount)
if err != nil {
return err
}
for k := range events {
// 一个事件一个事务
err = model.Transaction(func(model *domain.Model) error {
return model.Transaction(func(model *domain.Model) error {
events, offset, err := event_m.FetchEventGiftSend(model, BatchCount)
if err != nil {
return err
}
for k := range events {
cpEvent := &event_m.EventGiftSend{
Entity: mysql.Entity{
ID: events[k].ID,
......@@ -49,12 +48,12 @@ func (s *GiftSendEventService) Consume() error {
}
if cpEvent.MarkHiloGroup == mysql.YES {
model.Log.Warnf("already consume msg :%v", cpEvent)
return nil
continue
}
sendGiftEvent := new(gift_ev.SendGiftEvent)
if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil {
model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
return nil
continue
}
// 标记已经处理,mark比publish事件提前,尽量避免异步事件重复执行
cpEvent.Model = model
......@@ -67,13 +66,12 @@ func (s *GiftSendEventService) Consume() error {
model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", string(cpEvent.Payload), err)
return err
}
return nil
})
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment