Commit 34116196 authored by hujiebin's avatar hujiebin

feat:一个事件一个事务减少锁粒度

parent e8deba7d
......@@ -30,12 +30,13 @@ func (s *GiftSendEventService) Consume() error {
}
}()
var model = domain.CreateModel(s.svc.CtxAndDb)
return model.Transaction(func(model *domain.Model) error {
events, offset, err := event_m.FetchEventGiftSend(model, BatchCount)
if err != nil {
return err
}
for k := range events {
events, offset, err := event_m.FetchEventGiftSend(model, BatchCount)
if err != nil {
return err
}
for k := range events {
// 一个事件一个事务
err = model.Transaction(func(model *domain.Model) error {
cpEvent := &event_m.EventGiftSend{
Entity: mysql.Entity{
ID: events[k].ID,
......@@ -48,12 +49,12 @@ func (s *GiftSendEventService) Consume() error {
}
if cpEvent.MarkHiloGroup == mysql.YES {
model.Log.Warnf("already consume msg :%v", cpEvent)
continue
return nil
}
sendGiftEvent := new(gift_ev.SendGiftEvent)
if err := json.Unmarshal(cpEvent.Payload, sendGiftEvent); err != nil {
model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
continue
return nil
}
if err := gift_ev.PublishSendGiftEvent(model, sendGiftEvent); err != nil {
model.Log.Errorf("PublishSendGiftEvent fail,event:%v,err:%v", cpEvent, err)
......@@ -66,16 +67,13 @@ func (s *GiftSendEventService) Consume() error {
model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err)
return err
}
}
if err != nil {
model.Log.Errorf("batch consume msg has fail,event,err:%v", err)
// 暂时先允许丢数据,继续mark offset
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
})
return nil
})
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment