package event_s import ( "encoding/json" "golang.org/x/sync/errgroup" "hilo-user/domain" "hilo-user/domain/event/group_ev" "hilo-user/domain/model/event_m" "hilo-user/domain/service" "hilo-user/mycontext" "hilo-user/resource/mysql" "runtime/debug" ) // 每次处理500条 const BatchCount = 500 type GroupInEventService struct { svc *service.Service } func NewGroupInEventService(myContext *mycontext.MyContext) *GroupInEventService { svc := service.CreateService(myContext) return &GroupInEventService{svc} } // func (s *GroupInEventService) Consume() error { defer func() { if err := recover(); err != nil { s.svc.Log.Errorf("ExceptionHandle GroupInEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack())) } }() var model = domain.CreateModel(s.svc.CtxAndDb) events, offset, err := event_m.FetchEventGroupIn(model, BatchCount) if err != nil { return err } var wg errgroup.Group for k := range events { cpEvent := &event_m.EventGroupIn{ Entity: mysql.Entity{ ID: events[k].ID, CreatedTime: events[k].CreatedTime, UpdatedTime: events[k].UpdatedTime, }, Proto: events[k].Proto, Payload: events[k].Payload, Mark: events[k].Mark, } wg.Go(func() error { if cpEvent.Mark == mysql.YES { model.Log.Warnf("already consume msg :%v", cpEvent) return nil } groupInEvent := new(group_ev.GroupInEvent) if err := json.Unmarshal(cpEvent.Payload, groupInEvent); err != nil { model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err) return nil } // 发布事件 if err := group_ev.PublishGroupInEvent(model, groupInEvent); err != nil { model.Log.Errorf("PublishGroupInEvent,event:%v,err:%v", cpEvent, err) return err } // 标记已经处理 cpEvent.Model = model err = cpEvent.MarkDone() if err != nil { model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err) } return err }) } err = wg.Wait() if err != nil { model.Log.Errorf("batch consume msg has fail,event,err:%v", err) // 暂时先允许丢数据,继续mark offset } // 最后一次提交offset if len(events) > 0 { offset.MarkOffset = events[len(events)-1].ID return offset.Persistence() } return nil }