diff --git a/domain/service/event_s/send_gift.go b/domain/service/event_s/send_gift.go index e9b7208567d0aaca9cb0adb9f8f8cb91e9b1f785..159955a5f487805344db337e029981d3f7409711 100644 --- a/domain/service/event_s/send_gift.go +++ b/domain/service/event_s/send_gift.go @@ -30,12 +30,13 @@ func (s *GiftSendEventService) Consume() error { } }() var model = domain.CreateModel(s.svc.CtxAndDb) - return model.Transaction(func(model *domain.Model) error { - events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) - if err != nil { - return err - } - for k := range events { + events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) + if err != nil { + return err + } + for k := range events { + // 一个事件一个事务 + err = model.Transaction(func(model *domain.Model) error { cpEvent := &event_m.EventGiftSend{ Entity: mysql.Entity{ ID: events[k].ID, @@ -48,12 +49,12 @@ func (s *GiftSendEventService) Consume() error { } if cpEvent.MarkHiloGroup == mysql.YES { model.Log.Warnf("already consume msg :%v", cpEvent) - continue + return nil } sendGiftEvent := new(gift_ev.SendGiftEvent) if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil { model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) - continue + return nil } if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil { model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", cpEvent, err) @@ -66,16 +67,13 @@ func (s *GiftSendEventService) Consume() error { model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) return err } - } - if err != nil { - model.Log.Errorf("batch consume msg has fail,event,err:%v", err) - // 暂时先允许丢数据,继续mark offset - } - // 最后一次提交offset - if len(events) > 0 { - offset.MarkOffset = events[len(events)-1].ID - return offset.Persistence() - } - return nil - }) + return nil + }) + } + // 最后一次提交offset + if len(events) > 0 { + offset.MarkOffset = events[len(events)-1].ID + return offset.Persistence() + } + return nil }