Commit 5a78e911 authored by chenweijian's avatar chenweijian

user服

parent 79607c8d
Pipeline #1726 failed with stages
package event_s
import (
"errors"
"hilo-user/_const/enum/user_e"
"hilo-user/domain"
"hilo-user/domain/event/group_ev"
"hilo-user/domain/event/user_ev"
"hilo-user/domain/model/groupPower_m"
"hilo-user/domain/model/group_m"
"hilo-user/domain/model/user_m"
"hilo-user/domain/service/user_s"
"hilo-user/mycontext"
"hilo-user/req/jwt"
"hilo-user/resource/config"
"hilo-user/resource/mysql"
)
func EventInit() {
GroupInEvents()
GameEventInit()
GroupLeaveEvents()
ModelFuncInit()
}
// 用户进房事件
func GroupInEvents() {
group_ev.AddGroupInEventSync(func(model *domain.Model, event interface{}) error {
e, ok := event.(*group_ev.GroupInEvent)
if !ok {
model.Log.Errorf("AddGroupInEventSync event type err")
return nil
}
txGroupId, err := group_m.ToTxGroupId(model, e.GroupId)
if err != nil {
model.Log.Errorf("AddGroupInEventSync err:%v, imGroupId:%v", err, e.GroupId)
return nil
}
return user_s.NewGameService(mycontext.CreateMyContext(model.Cxt)).PushGameInfo("", e.ExternalId, txGroupId, 0)
})
}
// 用户退房事件
func GroupLeaveEvents() {
group_ev.AddGroupLeaveEventAsync(func(model *domain.Model, event interface{}) error {
e, ok := event.(*group_ev.GroupLeaveEvent)
if !ok || e == nil {
model.Log.Errorf("AddGroupLeaveEventAsync event type err")
return nil
}
txGroupId, err := group_m.ToTxGroupId(model, e.GroupId)
if err != nil {
model.Log.Errorf("ToTxGroupId fail:%v-%v", e.GroupId, err)
}
if err := user_s.NewGameService(model.MyContext).GameOpt(e.UserId, 0, "", e.ExternalId, "", "", txGroupId, user_e.GameOptExit, 0, false); err != nil {
model.Log.Warnf("AddGroupLeaveEventAsync GameOpt fail,e%v,err:%v", *e, err)
}
return nil
})
}
// 游戏上报事件
func GameEventInit() {
user_ev.AddReportGameInfoEventSync(func(model *domain.Model, event interface{}) error {
e, ok := event.(*user_ev.ReportGameInfoEvent)
if !ok {
model.Log.Errorf("AddReportGameInfoEventSync 消息类型错误!event:%+v", event)
return nil
}
switch e.ReportType {
case user_e.ReportTypeGameStart:
// 更新游戏信息
return user_s.NewGameService(model.MyContext).GameStart(e.GameStartObject)
//return user_m.GameStartUpdate(model, e.GameStartObject)
case user_e.ReportTypeGameSettle:
return user_s.NewGameService(model.MyContext).GameSettle(e.GameSettleObject)
}
return nil
})
}
func ModelFuncInit() {
group_m.GetGroupPowerNameByUserId = func(model *domain.Model, userId uint64) (uint64, string, error) {
return groupPower_m.GetUserGroupPower(model, userId)
}
// 编辑游戏-清理机器人
user_ev.AddGameEditEventAsync(func(model *domain.Model, event interface{}) error {
e, ok := event.(*user_ev.GameEditEvent)
if !ok {
return errors.New("GameEditEvent asset fail")
}
robots, err := user_m.GetRoomRobots(model, e.TxGroupId)
if err != nil {
model.Log.Errorf("GetRoomRobots fail:%v", err)
return err
}
model.Log.Infof("AddGameEditEventAsync event:%v,robots:%v", *e, robots)
var userIds []mysql.ID
for _, robot := range robots {
userIds = append(userIds, robot.UserId)
}
users, err := user_m.GetUserMapByIds(model, userIds)
if err != nil {
model.Log.Errorf("GetUserMapByIds fail:%v", err)
return err
}
userService := user_s.NewGameService(model.MyContext)
for i, robot := range robots {
user, ok := users[robot.UserId]
if !ok {
model.Log.Errorf("robot userId not exits:%v", robot)
continue
}
// filled
robots[i].GameOpt = userService.GameOpt // service func -> model func
robots[i].User = users[robot.UserId] // user info
robots[i].Token, _ = jwt.GenerateToken(user.ID, user.ExternalId,
config.GetConfigJWT().ISSUER_API) // jwt
// 离开
robots[i].Leave("GameEdit")
}
return nil
})
}
package event_s
import (
"encoding/json"
"golang.org/x/sync/errgroup"
"hilo-user/domain"
"hilo-user/domain/event/group_ev"
"hilo-user/domain/model/event_m"
"hilo-user/domain/service"
"hilo-user/mycontext"
"hilo-user/resource/mysql"
"runtime/debug"
)
// 每次处理500条
const BatchCount = 500
type GroupInEventService struct {
svc *service.Service
}
func NewGroupInEventService(myContext *mycontext.MyContext) *GroupInEventService {
svc := service.CreateService(myContext)
return &GroupInEventService{svc}
}
//
func (s *GroupInEventService) Consume() error {
defer func() {
if err := recover(); err != nil {
s.svc.Log.Errorf("ExceptionHandle GroupInEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack()))
}
}()
var model = domain.CreateModel(s.svc.CtxAndDb)
events, offset, err := event_m.FetchEventGroupIn(model, BatchCount)
if err != nil {
return err
}
var wg errgroup.Group
for k := range events {
cpEvent := &event_m.EventGroupIn{
Entity: mysql.Entity{
ID: events[k].ID,
CreatedTime: events[k].CreatedTime,
UpdatedTime: events[k].UpdatedTime,
},
Proto: events[k].Proto,
Payload: events[k].Payload,
Mark: events[k].Mark,
}
wg.Go(func() error {
if cpEvent.Mark == mysql.YES {
model.Log.Warnf("already consume msg :%v", cpEvent)
return nil
}
groupInEvent := new(group_ev.GroupInEvent)
if err := json.Unmarshal(cpEvent.Payload, groupInEvent); err != nil {
model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
return nil
}
// 发布事件
if err := group_ev.PublishGroupInEvent(model, groupInEvent); err != nil {
model.Log.Errorf("PublishGroupInEvent,event:%v,err:%v", cpEvent, err)
return err
}
// 标记已经处理
cpEvent.Model = model
err = cpEvent.MarkDone()
if err != nil {
model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err)
}
return err
})
}
err = wg.Wait()
if err != nil {
model.Log.Errorf("batch consume msg has fail,event,err:%v", err)
// 暂时先允许丢数据,继续mark offset
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
}
package event_s
import (
"encoding/json"
"golang.org/x/sync/errgroup"
"hilo-user/domain"
"hilo-user/domain/event/group_ev"
"hilo-user/domain/model/event_m"
"hilo-user/domain/service"
"hilo-user/mycontext"
"hilo-user/resource/mysql"
"runtime/debug"
)
type GroupLeaveEventService struct {
svc *service.Service
}
func NewGroupLeaveEventService(myContext *mycontext.MyContext) *GroupLeaveEventService {
svc := service.CreateService(myContext)
return &GroupLeaveEventService{svc}
}
//
func (s *GroupLeaveEventService) Consume() error {
defer func() {
if err := recover(); err != nil {
s.svc.Log.Errorf("ExceptionHandle GroupLeaveEventService Consume SYSTEM ACTION PANIC: %v, stack: %v", err, string(debug.Stack()))
}
}()
var model = domain.CreateModel(s.svc.CtxAndDb)
events, offset, err := event_m.FetchEventGroupLeave(model, BatchCount)
if err != nil {
return err
}
var wg errgroup.Group
for k := range events {
cpEvent := &event_m.EventGroupLeave{
Entity: mysql.Entity{
ID: events[k].ID,
CreatedTime: events[k].CreatedTime,
UpdatedTime: events[k].UpdatedTime,
},
Proto: events[k].Proto,
Payload: events[k].Payload,
Mark: events[k].Mark,
}
wg.Go(func() error {
if cpEvent.Mark == mysql.YES {
model.Log.Warnf("already consume msg :%v", cpEvent)
return nil
}
groupLeaveEvent := new(group_ev.GroupLeaveEvent)
if err := json.Unmarshal(cpEvent.Payload, groupLeaveEvent); err != nil {
model.Log.Errorf("json msg fail,event:%v,err:%v", cpEvent, err)
return nil
}
// 发布事件
if err := group_ev.PublishGroupLeaveEvent(model, groupLeaveEvent); err != nil {
model.Log.Errorf("PublishGroupLeaveEvent,event:%v,err:%v", cpEvent, err)
return err
}
// 标记已经处理
cpEvent.Model = model
err = cpEvent.MarkDone()
if err != nil {
model.Log.Errorf("consume msg fail,event:%v,err:%v", cpEvent, err)
}
return err
})
}
err = wg.Wait()
if err != nil {
model.Log.Errorf("batch consume msg has fail,event,err:%v", err)
// 暂时先允许丢数据,继续mark offset
}
// 最后一次提交offset
if len(events) > 0 {
offset.MarkOffset = events[len(events)-1].ID
return offset.Persistence()
}
return nil
}
This diff is collapsed.
package user_s
import (
"github.com/dgrijalva/jwt-go"
"hilo-user/myerr"
"hilo-user/myerr/bizerr"
"hilo-user/resource/config"
"hilo-user/resource/mysql"
"time"
)
// 载荷,增加用户别名
type Claims struct {
UserId uint64
ExternalId string
jwt.StandardClaims
}
// 生成App用的jwt token
// issuer 外面传
func generateGameJwtToken(userId uint64, externalId string, issuer string) (string, error) {
jwtConfig := config.GetConfigGameJWT()
duration, err := time.ParseDuration(jwtConfig.EXPIRE)
if err != nil {
return "", myerr.WrapErr(err)
}
expireTime := time.Now().Add(duration)
claims := Claims{
UserId: userId,
ExternalId: externalId,
StandardClaims: jwt.StandardClaims{
ExpiresAt: expireTime.UnixNano() / 1e6, //过期时间
Issuer: issuer, //签名的发行者
},
}
tokenClaims := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
token, err := tokenClaims.SignedString(getGameJWTSecret())
return token, myerr.WrapErr(err)
}
//解析jwt token
func ParseJwtToken(token, issuer string) (userId mysql.ID, externalId string, expiresAt int64, err error) {
tokenClaims, err := jwt.ParseWithClaims(token, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return getGameJWTSecret(), nil
})
if err != nil {
return
}
if tokenClaims != nil {
claims, ok := tokenClaims.Claims.(*Claims)
if ok && tokenClaims.Valid {
if time.Now().Unix() > claims.ExpiresAt {
err = bizerr.GameTokenExpire
return
}
if claims.Issuer != issuer {
err = bizerr.GameTokenInvalid
return
}
// success
userId, externalId, expiresAt = claims.UserId, claims.ExternalId, claims.ExpiresAt
}
} else {
err = bizerr.GameTokenInvalid
}
return
}
func getGameJWTSecret() []byte {
return []byte(config.GetConfigGameJWT().SECRET)
}
package group_s
import (
"encoding/json"
"github.com/sirupsen/logrus"
"hilo-user/domain"
"hilo-user/domain/model/group_m"
"hilo-user/sdk/tencentyun"
"runtime/debug"
)
// 发送群信令。入参是内部imGroupId,这里做转换
func SendSignalMsg(model *domain.Model, imGroupId, txGroupId string, msg group_m.GroupSystemMsg, async bool) {
model.Log.WithField("imGroupId:", imGroupId)
model.Log.WithField("txGroupId:", txGroupId)
groupId := txGroupId
var err error
if len(groupId) == 0 {
groupId, err = group_m.ToTxGroupId(model, imGroupId)
if err != nil {
return
}
}
buffer, err := json.Marshal(msg)
if err == nil {
str := string(buffer)
model.Log.Infof("SendSignalMsg: %s, async = %v", str, async)
if async {
go func(logger *logrus.Entry) {
defer func() {
if r := recover(); r != nil {
//打印错误堆栈信息
logger.Errorf("SendSignalMsg SYSTEM ACTION PANIC: %v, stack: %v", r, string(debug.Stack()))
}
}()
if err = tencentyun.SendSystemMsg(logger, groupId, []string{}, str); err != nil {
logger.Errorf("SendSignalMsg aync failed for %s, msgId = %d, context:%v, err:%v", groupId, msg.MsgId, str, err)
} else {
logger.Infof("SendSignalMsg aync success for %s, msgId = %d, context:%v, err:%v", groupId, msg.MsgId, str)
}
}(model.Log)
} else if err = tencentyun.SendSystemMsg(model.Log, groupId, []string{}, str); err != nil {
model.Log.Errorf("SendSignalMsg sync failed for %s, msgId = %d, context:%v, err:%v", groupId, msg.MsgId, str, err)
} else {
model.Log.Infof("SendSignalMsg sync success for %s, msgId = %d, context:%v", groupId, msg.MsgId, str)
}
} else {
model.Log.Errorln("Marshall failure, msgId = %d : %s", msg.MsgId, err.Error())
}
}
......@@ -11,8 +11,8 @@ import (
)
const (
RegisterName = "hiloGame"
RegisterTag = "游戏中心"
RegisterName = "hiloUser"
RegisterTag = "用户中心"
)
// 异步注册到consul
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment