Commit 952ddc73 authored by hujiebin's avatar hujiebin

feat:proto试试看

parent 9c815741
.idea
*.xlsx
/protocol/biz/
/protocol/userProxy/
/protocol/video/
/protocol/userCenter/
\ No newline at end of file
proto:
protoc --go_out=./ --go-grpc_out=. ./protocol/*.proto
\ No newline at end of file
......@@ -10,6 +10,7 @@ require (
github.com/fatih/color v1.9.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/hashicorp/consul/api v1.7.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
......@@ -35,7 +36,12 @@ require (
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.479 // indirect
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ims v1.0.479 // indirect
github.com/tencentyun/tls-sig-api-v2-golang v1.0.0 // indirect
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.42.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.63.2 // indirect
gorm.io/driver/mysql v1.4.3 // indirect
gorm.io/gorm v1.23.8 // indirect
......
This diff is collapsed.
syntax = "proto3";
package biz;
option go_package = "protocol/biz";
/* id = 1 */
message BizMessage {
uint32 type = 2;
string payLoad = 3;
}
/* id = 2 */
message BizMessageRsp {
uint32 status = 1;
}
service Transmitter {
rpc process(BizMessage) returns (BizMessageRsp) {}
}
\ No newline at end of file
syntax = "proto3";
package userCenter;
option go_package = "protocol/userCenter";
/* id = 1 */
message RouteMessage {
uint64 uid = 1;
uint32 msgType = 2;
bytes payLoad = 3;
}
/* id = 2 */
message RouteMessageRsp {
uint32 status = 1;
}
/* id = 3 */
message LoginMessage {
string proxyAddr = 1; // userProxy的地址:ip:port
string token = 2;
string clientAddr = 3; // 客户端地址(websocket):ip:port
}
/* id = 4 */
message LoginMessageRsp {
uint32 status = 1;
uint64 uid = 2;
}
/* id = 5 */
message LogoutMessage {
string clientAddr = 1; // 客户端地址(websocket):ip:port
uint64 uid = 2;
}
/* id = 6 */
message LogoutMessageRsp {
uint32 status = 1;
}
/* id = 7 */
message MulticastMessage {
repeated uint64 uids = 1;
uint32 msgType = 2;
bytes payLoad = 3;
}
/* id = 8 */
message MulticastMessageRsp {
repeated uint64 failedUids = 1;
}
/* id = 9 */
message KickMessage {
uint64 uid = 1;
string addr = 2;
}
/* id = 10 */
message KickMessageRsp {
uint32 status = 1;
}
/* id = 11 */
message BroadcastMessage {
uint32 msgType = 2;
bytes payLoad = 3;
}
/* id = 12 */
message BroadcastMessageRsp {
repeated uint64 failedUids = 1;
}
/* id = 13 */
message BizMessage {
uint64 uid = 1;
uint32 msgType = 2;
string payLoad = 3;
}
/* id = 14 */
message BizMessageRsp {
uint32 status = 1;
}
service Router {
rpc route(RouteMessage) returns (RouteMessageRsp) {}
rpc kickUser(KickMessage) returns (KickMessageRsp) {}
}
service User {
rpc login(LoginMessage) returns (LoginMessageRsp) {}
rpc logout(LogoutMessage) returns (LogoutMessageRsp) {}
rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {}
rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {}
rpc transmit(BizMessage) returns (BizMessageRsp) {}
}
\ No newline at end of file
syntax = "proto3";
package userProxy;
option go_package = "protocol/userProxy";
/* user*/
message User {
uint64 id = 1;
string externalId = 2;
string nick = 3;
string avatar = 4;
string country = 5;
string countryIcon = 6;
uint64 birthday = 7;
bool isVip = 8;
bool isLike = 9;
bool isLikeMe = 10;
}
/* Svip*/
message Svip {
uint64 svipLevel = 1;
repeated SvipPrivilege privileges = 2;
}
message SvipPrivilege {
int32 type = 1;
bool canSwitch = 2;
bool userSwitch = 3;
string mysteryCode = 4;
}
/* id = 1 登录*/
message Login {
string token = 1;
}
/* id = 2 登录的回应 */
message LoginRsp {
uint32 status = 1;
}
/* id = 3 客户端心跳 */
message HeartBeat {
string externalUid = 1;
}
/* id = 4 客户端心跳的回应 */
message HeartBeatRsp {
uint32 status = 1;
}
/* id = 7 客户端上行消息 */
message BizRequest {
uint32 type = 1;
string payLoad = 2;
}
/* id = 8 客户端上行消息的应答 */
message BizResponse {
uint32 status = 1;
}
/* id == 100 | 140 匹配结果通知 waitDuration:开始/下一个时间 matchUniqueId:匹配一对的唯一标识码, status:是否是落单 singleWaitTimeInSec:单方等待连接最长时间 dualWaitTimeInSec:双方连接中最长时间*/
message MatchSuccess {
string localUserId = 1;
string remoteUserId = 2;
uint32 waitDuration = 3;
string matchUniqueId = 4;
bool status = 5;
uint32 singleWaitTimeInSec = 6;
uint32 dualWaitTimeInSec = 7;
User remoteUser = 8;
}
/* id == 101 匹配后用户选择结果通知, failType: 只有status=2 才有值,其它为0,failType=1:等待时间到了,拒绝 failType=2:主动拒绝 */
message MatchConfirm {
uint32 status = 1;
string channelId = 2;
string token = 3;
string localUserId = 4;
string remoteUserId = 5;
uint32 remoteAgoraId = 6;
uint32 callDuration = 7;
uint32 localAgoraId = 8;
uint32 diamondBalance = 9;
string matchUniqueId = 10;
uint32 failType = 11;
}
/* id == 102 视频通话准备 */
message CallReady {
uint64 startTimestamp = 1;
uint64 endTimestamp = 2;
uint64 callDuration = 3;
string channelId = 4;
uint64 remainDiamond = 5;
}
/* id == 103 礼物加时 */
message AddTimeGift {
uint32 giftId = 1;
string token = 2;
uint32 duration = 3;
uint64 endTimestamp = 4;
string channelId = 5;
bool isSender = 6;
uint32 giftNum = 7;
string iconUrl = 8;
string svgaUrl = 9;
string senderAvatar = 10;
string receiverAvatar = 11;
}
/* id == 104 免费加时 */
message AddTimeFree {
string token = 1;
uint32 duration = 2;
uint64 endTimestamp = 3;
string channelId = 4;
uint32 senderAgoraId = 5;
}
/* id == 105 退出 */
message ConnectsQuit {
uint64 from_user_id = 1;
}
/* id == 106 连接状态 */
message ConnectStatus {
uint64 from_user_id = 1;
float user_diamonds = 2;
bool diamonds_enough = 3;
}
/* id == 107 ??? */
message ConnectsCall {
uint64 from_user_id = 1;
string rong_room_name = 2;
bool is_join = 3;
}
/* id == 108 */
message ConnectCommon {
string rong_room_name = 1;
uint64 from_user_id = 2;
string extra = 3;
string message = 4;
}
/* id == 109 召回授权弹框 */
message RecallWindow {
}
/* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/
message Video {
string videoUniqueId = 1;
string channelId = 2;
uint32 localAgoraId = 3;
uint32 remoteAgoraId = 4;
string agoraToken = 5;
string sendUserId = 6;
string receiveUserId = 7;
uint32 status = 8;
uint32 diamondBalance = 9;
User sendUser = 10;
}
/* id == 111 视频通话准备 */
message VideoCallReady {
uint64 startTimestamp = 1;
uint64 endTimestamp = 2;
uint64 callDuration = 3;
string channelId = 4;
uint64 remainDiamond = 5;
}
/* id == 112 互相喜欢 */
message LikeEach {
string remoteUserId = 1;
}
/* id == 113 喜欢我 */
message LikeMe {
string remoteUserId = 1;
string remoteNick = 2;
string channelId = 3;
}
/* id == 114 日常进入app,获取钻石 */
message DailyInAppDiamond {
uint32 diamondNum = 1;
}
/* id == 115 横幅 */
message GlobalGiftBanner {
uint32 bannerLevel = 1;
uint64 giftId = 2;
uint32 giftNum = 3;
string sendUserId = 4;
string receiveUserId = 5;
string groupId = 6;
string sendUserCode = 7;
string sendUserAvatar = 8;
string sendUserNick = 9;
string receiveUserNick = 10;
string giftPicUrl = 11;
Svip svip = 12;
Svip receiveSvip = 13;
}
/* id == 116 横幅的回应,用来测量RTT */
message GlobalGiftBannerRsp {
uint32 bannerLevel = 1;
uint64 giftId = 2;
uint32 giftNum = 3;
string sendUserId = 4;
string receiveUserId = 5;
string groupId = 6;
}
/*id==117 幸运转盘通知,客户端重新拉取查询, type:客户端不用理*/
message LuckyWheel {
string groupId = 1;
uint32 type = 2;
}
/* id == 118 幸运转盘获胜者全服广播 */
message LuckyWheelBanner {
uint32 diamondNum = 1;
string sendUserId = 2;
string groupId = 3;
string nick = 4;
string code = 5;
string avatar = 6;
Svip svip = 7;
}
/* id == 119 幸运转盘钻石变化 */
message LuckyWheelDiamondChange {
string groupId = 1;
}
/* id == 120 服务器配置变更 */
message ConfigChange {
uint32 type = 1;
}
/* id == 121 全局火箭横幅 */
message GlobalRocketNotice {
string groupId = 1;
string period = 2;
uint32 round = 3;
uint32 stage = 4;
string topUserIcon = 5;
string nick = 6;
string code = 7;
string avatar = 8;
Svip svip = 9;
}
/* id == 122 群发功能弹窗 */
message GroupSendNotice {
string senderExtId = 1;
string senderCode = 2;
uint32 senderSex = 3;
string senderAvatar = 4;
string text = 5;
string groupName = 6;
string groupCode = 7;
string groupAvatar = 8;
uint32 userInNum = 9; // 最近进入房间的人数
string groupId = 10;
}
/* id == 123 全球消息 */
message GlobalBroadcast {
string senderExtId = 1;
string senderCode = 2;
uint32 senderSex = 3;
string senderAvatar = 4;
string senderNick = 5;
string msg = 6;
string groupId = 7;
uint32 senderNobleLevel = 8;
}
/* id == 124 全球消息 */
message MicTaskFinish {
string userId = 1;
uint32 diamond = 2;
}
/* id == 125 水果机开奖通知 */
message FruitMachine {
string date = 1;
uint32 round = 2;
}
/* id == 126 贵族变化 */
message NobleChange {
}
/* id == 127 加入群组成功 */
message JoinGroup {
string groupId = 1;
string externalId = 2;
}
/* id == 128 1对1视频1分钟加时成功 */
message VideoTimeMinuteSuccess {
string token = 1;
uint32 duration = 2;
uint64 endTimestamp = 3;
string channelId = 4;
uint32 senderAgoraId = 5;
string videoUniqueId = 6;
bool isSend = 7;
uint32 sendRemainDiamond = 8;
}
/* id == 129 1对1视频1分钟加时询问检查 */
message VideoTimeMinuteCheck {
string videoUniqueId = 1;
uint32 diamond = 2;
string uuid = 3;
}
/* id == 130 1对1视频,错过 */
message VideoMiss {
uint32 totalNum = 1;
}
/* id == 131 进房,群组活动信息 */
message GroupActivity {
string ActivityId = 1;// id
uint64 StartAt = 2; // 开始时间戳,东八区时间戳
uint64 EndAt = 3; // 结束时间戳,东八区时间戳
string Banner = 4; // banner url
int32 AcType = 5; // 类型1.游戏2.比赛3.排队4.诗歌
string Theme = 6; // 活动主题
int32 PersonNum = 7; // 订阅人数
bool IsSubscribe = 8; // 我是否订阅该活动
string GroupId = 9; // 群id
}
/* id == 144 邀请用户成为房间会员 */
message RoomInviteMember {
string group_id = 1;
}
\ No newline at end of file
syntax = "proto3";
package video;
option go_package = "protocol/video";
/* id = 1 视频加时 */
message VideoMinuteConfirm {
string videoUid = 1;
string externalId = 2;
}
/* id = 2 视频加时的应答 */
message VideoMinuteConfirmRsp {
uint32 status = 1;
}
package rpc
import (
"encoding/json"
"fmt"
"git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/resource/mysql"
"strconv"
"time"
)
type TypeRpc uint8
const (
//MatchConfirm TypeRpc = 101
//CallReady TypeRpc = 102
//AddTimeGift TypeRpc = 103
//AddTimeFree TypeRpc = 104
//RecallWindow TypeRpc = 109
//Video TypeRpc = 110
//VideoCallReady TypeRpc = 111
)
type RpcLog struct {
ID uint64 `gorm:"primary_key"`
Type TypeRpc
UserId string
Msg string
Err string
FailUids string
}
func (RpcLog) TableName() string {
month := time.Now().Format("200601")
return fmt.Sprintf("rpc_log_%s", month)
}
func AddRpcLog(t TypeRpc, userId uint64, msg string, failUids []uint64, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
failUidStr, _ := json.Marshal(failUids)
logRpc := RpcLog{
Type: t,
UserId: strconv.FormatUint(userId, 10),
Msg: msg,
Err: errStr,
FailUids: string(failUidStr[:]),
}
if e := mysql.Db.Table(RpcLog{}.TableName()).Create(&logRpc).Error; e != nil {
mylogrus.MyLog.Errorf("log rpc save fail, err:%v", e)
}
}
func AddRpcLogs(t TypeRpc, userIds []uint64, msg string, failUids []uint64, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
failUidStr, _ := json.Marshal(failUids)
userIdStr, _ := json.Marshal(userIds)
logRpc := RpcLog{
Type: t,
UserId: string(userIdStr[:]),
Msg: msg,
Err: errStr,
FailUids: string(failUidStr[:]),
}
if e := mysql.Db.Table(RpcLog{}.TableName()).Create(&logRpc).Error; e != nil {
mylogrus.MyLog.Errorf("log rpc save fail, err:%v", e)
}
}
package rpc
import (
"context"
"fmt"
"git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/protocol/userCenter"
"git.hilo.cn/hilo-common/resource/config"
"git.hilo.cn/hilo-common/resource/consul"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"time"
)
const (
Port = 50060
)
var userClient userCenter.UserClient
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
var (
defaultUserCenterAddr = "127.0.0.1:50040" // userCenter default addr
userCenterAddr = defaultUserCenterAddr
userCenterConsulName = "userCenter"
)
// grpc服务发现
type Builder struct {
addrs map[string][]string
cc resolver.ClientConn
}
func (b *Builder) Scheme() string {
return "uc" // userCenter
}
type Resolver struct {
}
func (r Resolver) ResolveNow(options resolver.ResolveNowOptions) {}
func (r Resolver) Close() {}
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &Resolver{}
paths := b.addrs[target.URL.Path]
addrs := make([]resolver.Address, len(paths))
for i, s := range paths {
addrs[i] = resolver.Address{Addr: s}
}
cc.UpdateState(resolver.State{Addresses: addrs})
b.cc = cc
return r, nil
}
func (b *Builder) UpdateState(addrs []string) {
as := make([]resolver.Address, len(addrs))
for i, s := range addrs {
as[i] = resolver.Address{Addr: s}
}
b.cc.UpdateState(resolver.State{Addresses: as})
}
func init() {
client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数
mylogrus.MyLog.Infoln(client, err)
if err != nil {
mylogrus.MyLog.Fatalln(err)
}
if client == nil {
mylogrus.MyLog.Fatalln("Fail to get consul client.")
}
// 本地环境下不需要userCenter
if config.AppIsLocal() {
mylogrus.MyLog.Infoln("userCenter is not required in local env.")
return
}
// 服务发现
bd := &Builder{addrs: map[string][]string{"/api": {userCenterAddr}}}
cataLog := client.Catalog()
if cataLog == nil {
mylogrus.MyLog.Fatalln("No catalog.")
}
services, _, err := cataLog.Service("userCenter", "", nil)
if err != nil {
mylogrus.MyLog.Fatalln(err)
}
if len(services) == 0 {
mylogrus.MyLog.Fatalln("userCenter not found.")
}
var addrs []string
for _, s := range services {
addrs = append(addrs, fmt.Sprintf("%s:%d", s.ServiceAddress, s.ServicePort))
}
if len(addrs) > 0 {
bd = &Builder{addrs: map[string][]string{"/api": addrs}}
userCenterAddr = "uc:///api"
}
mylogrus.MyLog.Infof("userCenterAddr:%v,addr:%v", userCenterAddr, addrs)
resolver.Register(bd)
go func() {
address := consulapi.DefaultConfig().Address // 用consul api的default config
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) {
if statusAddrs, ok := serviceStatus[userCenterConsulName]; ok {
healthAddrs, _ := statusAddrs[consulapi.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", userCenterConsulName, healthAddrs)
bd.UpdateState(healthAddrs) // 更新新的注册名
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", userCenterConsulName, defaultUserCenterAddr)
bd.UpdateState([]string{defaultUserCenterAddr}) // 都没有健康的,使用默认本地回环的
}
for status := range statusAddrs {
if status == consulapi.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", userCenterConsulName, status, statusAddrs[status])
}
}
}); err != nil {
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}()
//userCenterAddr := services[0].Address + ":" + strconv.Itoa(services[0].ServicePort)
//mylogrus.MyLog.Printf("Choose userCenter %s, %s, weights: %v\n", services[0].ID, userCenterAddr, services[0].ServiceWeights)
// Set up a connection to the userCenter.
conn, err := grpc.Dial(userCenterAddr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(kacp),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy":"%s"}`, "round_robin")))
if err != nil {
mylogrus.MyLog.Fatalf("did not connect: %v", err)
}
//defer conn.Close()
userClient = userCenter.NewUserClient(conn)
if userClient == nil {
mylogrus.MyLog.Fatalln("userClient null")
}
}
func multicast(uids []uint64, msgType uint32, data []byte) ([]uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
rsp, err := userClient.Multicast(ctx, &userCenter.MulticastMessage{
Uids: uids,
MsgType: msgType,
PayLoad: data,
})
if err != nil {
mylogrus.MyLog.Errorf("Multicast message failed %s", err.Error())
}
if rsp != nil {
mylogrus.MyLog.Infof("Multicast message res:%+v", rsp)
return rsp.FailedUids, err
} else {
return []uint64{}, err
}
}
//广播
func broadcast(msgType uint32, data []byte) ([]uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
rsp, err := userClient.Broadcast(ctx, &userCenter.BroadcastMessage{
MsgType: msgType,
PayLoad: data,
})
if err != nil {
mylogrus.MyLog.Errorf("broadcast message failed %s", err.Error())
}
if rsp != nil {
mylogrus.MyLog.Infof("broadcast message res:%v", rsp)
return rsp.FailedUids, err
} else {
return []uint64{}, err
}
}
package rpc
import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"time"
)
const (
MsgTypeLogin = 1 + iota
MsgTypeLoginRsp
MsgTypeHeartBeat
MsgTypeHeartBeatRsp
MsgTypeKickUser
MsgTypeBiz = 7
MsgTypeBizRsp
)
const (
MsgTypeMatchSuccess = 100 + iota
MsgTypeMatchConfirm
MsgTypeCallReady
MsgTypeAddTimeGift
MsgTypeAddTimeFree
)
const (
MsgTypeRecallWindow = 109
MsgTypeVideo = 110 // 1v1视频-v1-黄钻
MsgTypeVideoCallReady = 111
MsgTypeLikeEach = 112
MsgTypeLikeMe = 113
MsgTypeDailyInAppDiamond = 114
MsgTypeGlobalGiftBanner = 115
MsgTypeGlobalGiftBannerRsp = 116
MsgTypeLuckyWheel = 117 //已丢弃
MsgTypeLuckyWheelBanner = 118
MsgTypeDiamondChange = 119 // Kludge:事实上是通用的钻石变更通知了
MsgTypeConfigChange = 120 // 配置变更通知
MsgTypeGlobalRocketNotice = 121 // 火箭全局横幅
MsgTypeGroupChatNotice = 122 // 群发消息弹窗
MsgTypeGlobalBroadcast = 123 // 群发消息弹窗
MsgMicTaskFinish = 124 //麦上任务完成
MsgFruitMachine = 125 // 水果机开奖
MsgTypeNobleChange = 126 // 贵族变更
MsgTypeJoinGroup = 127 // 加入群组成功
MsgTypeVideoTimeMinuteSuccess = 128 //1对1视频加时成功
MsgTypeVideoTimeMinuteCheck = 129 // 1对1视频加时检查
MsgTypeVideoMiss = 130 //1对1视频错过
MsgTypeRoomGroupActivity = 131 //进房,群组活动推送
MsgTypeVideoV2 = 132 // 1v1视频-v2-粉钻
MsgTypeVideoV2TimeMinuteCheck = 133 // 1v1视频-v2-加时检查
MsgTypeVideoV2CallReady = 134 // 1v1视频-v2-callReady
MsgTypeVideoV2TimeMinuteSuccess = 135 // 1v1视频-v2-加时成功
MsgTypeMatchV2Success = 140 // 匹配-v2-成功
MsgTypeMatchV2Confirm = 141 // 匹配-v2-确认
MsgTypeMatchV2CallReady = 142 // 匹配-v2-callReady
MsgTypeMatchV2AddTimeGift = 143 // 匹配-v2-送礼加时长
)
const (
RoomBannerChange = 1 // 房间banner变更
GiftConfigChange = 2 // 礼物配置变更
OpenScreenChange = 3 // 开屏配置变更
MatchConfigChange = 4 // 匹配配置发生了变化
)
func EncodeMessage(msgType uint32, serialNum uint64, userdata []byte) []byte {
msg := make([]byte, 26)
dataLen := len(userdata)
binary.BigEndian.PutUint16(msg, 1)
binary.BigEndian.PutUint32(msg[2:], msgType)
binary.BigEndian.PutUint64(msg[6:], serialNum)
binary.BigEndian.PutUint64(msg[14:], uint64(time.Now().UnixNano()/1000))
binary.BigEndian.PutUint32(msg[22:], uint32(dataLen))
msg = append(msg, userdata...)
checkSum := crc32.ChecksumIEEE(msg)
msg = append(msg, 0, 0, 0, 0)
binary.BigEndian.PutUint32(msg[26+dataLen:], checkSum)
return msg
}
func DecodeMessage(message []byte) (uint32, uint64, uint64, []byte, error) {
length := len(message)
// 保证消息至少有26bytes
if length >= 26 {
//version := binary.BigEndian.Uint16(message[0:2])
msgType := binary.BigEndian.Uint32(message[2:6])
msgId := binary.BigEndian.Uint64(message[6:14])
timeStamp := binary.BigEndian.Uint64(message[14:22])
dataLen := binary.BigEndian.Uint32(message[22:26])
//log.Printf("DecodeMessage version = %d, msgType = %d, msgId = %d, timeStamp = %d, dataLen = %d\n", version, msgType, msgId, timeStamp, dataLen)
// 保证ws消息至少有msgLen长
if uint32(length) >= dataLen+30 {
pbData := message[26 : dataLen+26]
checksum := binary.BigEndian.Uint32(message[dataLen+26 : dataLen+30])
//fmt.Printf("pbData size = %d, checksum = %d\n", len(pbData), checksum)
myCheckSum := crc32.ChecksumIEEE(message[0 : dataLen+26])
if checksum != myCheckSum {
return 0, msgId, timeStamp, nil, errors.New("checksum error")
}
return msgType, msgId, timeStamp, pbData, nil
} else {
fmt.Printf("payload too short length = %d, msgType = %d\n", length, msgType)
return 0, msgId, timeStamp, nil, errors.New("payload too short")
}
} else {
fmt.Printf("message too short for header %d\n ", length)
return 0, 0, 0, nil, errors.New("message too short")
}
}
package rpc
import (
"encoding/json"
"git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/protocol/userProxy"
"google.golang.org/protobuf/proto"
)
func SendFruitMachine(date string, round uint32) error {
msg := &userProxy.FruitMachine{
Date: date,
Round: round,
}
if buffer, err := proto.Marshal(msg); err == nil {
rspUids, err := broadcast(MsgFruitMachine, buffer)
//记录socket,注意闭包问题
go func(userId uint64, msg *userProxy.FruitMachine, rspUids []uint64, err error) {
buf, _ := json.Marshal(msg)
AddRpcLog(MsgFruitMachine, userId, string(buf[:]), rspUids, err)
}(0, msg, rspUids, err)
if err != nil {
mylogrus.MyLog.Errorf("grpc SendFruitMachine send fail")
return err
} else {
mylogrus.MyLog.Info("grpc SendFruitMachine send success")
}
} else {
return err
}
return nil
}
func SendGlobalRocketNotice(groupId string, period string, round uint32, stage uint32, fromUserId uint64, topUserIcon string, nick string, code string, avatar string) error {
msg := &userProxy.GlobalRocketNotice{
GroupId: groupId,
Period: period,
Round: round,
Stage: stage,
TopUserIcon: topUserIcon,
Nick: nick,
Code: code,
Avatar: avatar,
}
if buffer, err := proto.Marshal(msg); err == nil {
rspUids, err := broadcast(MsgTypeGlobalRocketNotice, buffer)
//记录socket,注意闭包问题
go func(userId uint64, msg *userProxy.GlobalRocketNotice, rspUids []uint64, err error) {
buf, _ := json.Marshal(msg)
AddRpcLog(MsgTypeGlobalRocketNotice, userId, string(buf[:]), rspUids, err)
}(fromUserId, msg, rspUids, err)
if err != nil {
mylogrus.MyLog.Errorf("grpc GlobalRocketNotice send fail")
return err
} else {
mylogrus.MyLog.Info("grpc GlobalRocketNotice send success")
}
} else {
return err
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment