group_in.go 2.25 KB
Newer Older
hujiebin's avatar
hujiebin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
package event_s

import (
	"encoding/json"
	"git.hilo.cn/hilo-common/domain"
	"git.hilo.cn/hilo-common/mycontext"
	"git.hilo.cn/hilo-common/resource/mysql"
	"golang.org/x/sync/errgroup"
	"hilo-group/domain/event/group_ev"
	"hilo-group/domain/model/event_m"
	"runtime/debug"
)

type GroupInEventService struct {
	svc *domain.Service
}

func NewGroupInEventService(myContext *mycontext.MyContext) *GroupInEventService {
	svc := domain.CreateService(myContext)
	return &GroupInEventService{svc}
}

//
func (s *GroupInEventService) Consume() error {
	defer func() {
		if err := recover(); err != nil {
			s.svc.Log.Errorf("ExceptionHandle GroupInEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack()))
		}
	}()
	var model = domain.CreateModel(s.svc.CtxAndDb)
	events, offset, err := event_m.FetchEventGroupIn(model, BatchCount)
	if err != nil {
		return err
	}
	var wg errgroup.Group
	for k := range events {
		cpEvent := &event_m.EventGroupIn{
			Entity: mysql.Entity{
				ID:          events[k].ID,
				CreatedTime: events[k].CreatedTime,
				UpdatedTime: events[k].UpdatedTime,
			},
			Proto:         events[k].Proto,
			Payload:       events[k].Payload,
			Mark:          events[k].Mark,
			MarkHiloGroup: events[k].MarkHiloGroup,
		}
		wg.Go(func() error {
			if cpEvent.MarkHiloGroup == mysql.YES {
				model.Log.Warnf("already consume msg :%v", cpEvent)
				return nil
			}
			groupInEvent := new(group_ev.TxGroupInEvent)
			if err := json.Unmarshal(cpEvent.Payload, groupInEvent); err != nil {
				model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
				return nil
			}
			// 发布事件
			if err := group_ev.PublishTxGroupInEvent(model, groupInEvent); err != nil {
				model.Log.Errorf("PublishTxGroupInEvent,event:%v,err:%v", cpEvent, err)
				return err
			}
			// 标记已经处理
			cpEvent.Model = model
			err = cpEvent.MarkDone()
			if err != nil {
				model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err)
			}
			return err
		})
	}
	err = wg.Wait()
	if err != nil {
		model.Log.Errorf("batch consume msg has fail,event,err:%v", err)
		// 暂时先允许丢数据,继续mark offset
	}
	// 最后一次提交offset
	if len(events) > 0 {
		offset.MarkOffset = events[len(events)-1].ID
		return offset.Persistence()
	}
	return nil
}