From 3411619604417106a8b36a932a24a5b29b752bbd Mon Sep 17 00:00:00 2001 From: hujiebin Date: Tue, 7 Mar 2023 15:04:52 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=B8=80=E4=B8=AA=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E4=B8=80=E4=B8=AA=E4=BA=8B=E5=8A=A1=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E9=94=81=E7=B2=92=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- domain/service/event_s/send_gift.go | 38 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/domain/service/event_s/send_gift.go b/domain/service/event_s/send_gift.go index e9b7208..159955a 100644 --- a/domain/service/event_s/send_gift.go +++ b/domain/service/event_s/send_gift.go @@ -30,12 +30,13 @@ func (s *GiftSendEventService) Consume() error { } }() var model = domain.CreateModel(s.svc.CtxAndDb) - return model.Transaction(func(model *domain.Model) error { - events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) - if err != nil { - return err - } - for k := range events { + events, offset, err := event_m.FetchEventGiftSend(model, BatchCount) + if err != nil { + return err + } + for k := range events { + // 一个事件一个事务 + err = model.Transaction(func(model *domain.Model) error { cpEvent := &event_m.EventGiftSend{ Entity: mysql.Entity{ ID: events[k].ID, @@ -48,12 +49,12 @@ func (s *GiftSendEventService) Consume() error { } if cpEvent.MarkHiloGroup == mysql.YES { model.Log.Warnf("already consume msg :%v", cpEvent) - continue + return nil } sendGiftEvent := new(gift_ev.SendGiftEvent) if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil { model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) - continue + return nil } if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil { model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", cpEvent, err) @@ -66,16 +67,13 @@ func (s *GiftSendEventService) Consume() error { model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) return err } - } - if err != nil { - model.Log.Errorf("batch consume msg has fail,event,err:%v", err) - // 暂时先允许丢数据,继续mark offset - } - // 最后一次提交offset - if len(events) > 0 { - offset.MarkOffset = events[len(events)-1].ID - return offset.Persistence() - } - return nil - }) + return nil + }) + } + // 最后一次提交offset + if len(events) > 0 { + offset.MarkOffset = events[len(events)-1].ID + return offset.Persistence() + } + return nil } -- 2.22.0