Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
H
hilo-userCenter
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
hujiebin
hilo-userCenter
Compare Revisions
master...feature/level-cast
Source
feature/level-cast
Select Git revision
...
Target
master
Select Git revision
Compare
Commits (3)
Update userProxy.proto
· 6b9e9fe0
hujiebin
authored
Sep 25, 2023
6b9e9fe0
feat:levelCast
· a5ba9f6a
hujiebin
authored
Sep 25, 2023
a5ba9f6a
Update userManager.go
· 9d8cff5b
hujiebin
authored
Sep 25, 2023
9d8cff5b
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
172 additions
and
64 deletions
+172
-64
main.go
main.go
+90
-3
userManager.go
manager/userManager.go
+29
-0
userCenter.proto
protocol/userCenter.proto
+11
-0
userProxy.proto
protocol/userProxy.proto
+42
-61
No files found.
main.go
View file @
9d8cff5b
...
...
@@ -40,12 +40,14 @@ const (
kickChanSize
=
500
broadcastChanSize
=
3500
areacastChanSize
=
3500
levelcastChanSize
=
3500
)
var
(
kickChan
chan
KickChanMsg
broadcastChan
chan
BroadcastChanMsg
areacastChan
chan
AreaChanMsg
levelcastChan
chan
LevelChanMsg
)
type
KickChanMsg
struct
{
...
...
@@ -65,6 +67,12 @@ type AreaChanMsg struct {
in
*
userCenter
.
AreaMessage
}
type
LevelChanMsg
struct
{
ProxyAddr
string
UserIds
[]
uint64
in
*
userCenter
.
LevelMessage
}
var
(
userManager
*
manager
.
UserManager
=
nil
termManager
*
manager
.
TerminalManager
=
nil
...
...
@@ -294,6 +302,56 @@ func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*use
return
&
userCenter
.
AreaMessageRsp
{
FailedUids
:
failed
},
nil
}
func
(
s
*
server
)
Levelcast
(
ctx
context
.
Context
,
in
*
userCenter
.
LevelMessage
)
(
*
userCenter
.
LevelMessageRsp
,
error
)
{
var
failed
[]
uint64
terminals
:=
termManager
.
GetAll
()
if
terminals
!=
nil
{
var
uids
[]
uint64
for
uidStr
:=
range
*
terminals
{
if
uid
,
_
:=
strconv
.
ParseUint
(
uidStr
,
10
,
64
);
uid
>
0
{
uids
=
append
(
uids
,
uid
)
}
}
// 处理分区用户
levelUserIds
:=
userManager
.
GetLevelUsers
(
uids
,
in
.
Level
)
if
len
(
levelUserIds
)
<=
0
{
return
&
userCenter
.
LevelMessageRsp
{
FailedUids
:
failed
},
nil
}
m
:=
make
(
map
[
string
][]
uint64
,
0
)
for
uid
:=
range
levelUserIds
{
ok
:=
false
addr
:=
userManager
.
GetUser
(
uid
)
if
addr
!=
nil
{
if
_
,
ok
:=
m
[
*
addr
];
!
ok
{
m
[
*
addr
]
=
make
([]
uint64
,
0
)
}
m
[
*
addr
]
=
append
(
m
[
*
addr
],
uid
)
ok
=
true
}
else
{
mylogrus
.
MyLog
.
Errorf
(
"Unknown user %d
\n
"
,
uid
)
}
if
!
ok
{
failed
=
append
(
failed
,
uid
)
}
}
for
addr
,
users
:=
range
m
{
const
sendBatchSize
=
5
for
i
:=
0
;
i
<
len
(
users
);
i
+=
sendBatchSize
{
end
:=
i
+
sendBatchSize
if
end
>
len
(
users
)
{
end
=
len
(
users
)
}
levelcastChan
<-
LevelChanMsg
{
ProxyAddr
:
addr
,
UserIds
:
users
[
i
:
end
],
in
:
in
,
}
}
}
}
return
&
userCenter
.
LevelMessageRsp
{
FailedUids
:
failed
},
nil
}
func
(
s
*
server
)
Transmit
(
ctx
context
.
Context
,
in
*
userCenter
.
BizMessage
)
(
*
userCenter
.
BizMessageRsp
,
error
)
{
mylogrus
.
MyLog
.
Infof
(
"Transmiting msgType = %d, uid = %d, payLoad: %s
\n
"
,
in
.
MsgType
,
in
.
Uid
,
in
.
PayLoad
)
...
...
@@ -384,6 +442,22 @@ func realAreacast(addr string, uids []uint64, msg *userCenter.AreaMessage) {
}
}
func
realLevelcast
(
addr
string
,
uids
[]
uint64
,
msg
*
userCenter
.
LevelMessage
)
{
for
_
,
uid
:=
range
uids
{
client
:=
manager
.
UserProxyMgr
.
MakeClient
(
addr
)
if
client
==
nil
{
mylogrus
.
MyLog
.
Errorf
(
"Failed in making client for %d, %s
\n
"
,
uid
,
addr
)
}
else
{
toRouterClient
:=
userCenter
.
NewRouterClient
(
client
)
status
,
err
:=
routeMessage
(
toRouterClient
,
uid
,
msg
.
MsgType
,
msg
.
PayLoad
)
if
err
!=
nil
{
mylogrus
.
MyLog
.
Errorf
(
"routeMessage uid = %d, msgType = %d, status = %d, %v"
,
uid
,
msg
.
MsgType
,
status
,
err
)
}
}
}
}
func
routeMessage
(
c
userCenter
.
RouterClient
,
uid
uint64
,
msgType
uint32
,
data
[]
byte
)
(
uint32
,
error
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
time
.
Second
*
3
)
defer
cancel
()
...
...
@@ -613,6 +687,7 @@ func main() {
kickChan
=
make
(
chan
KickChanMsg
,
kickChanSize
)
broadcastChan
=
make
(
chan
BroadcastChanMsg
,
broadcastChanSize
)
areacastChan
=
make
(
chan
AreaChanMsg
,
areacastChanSize
)
levelcastChan
=
make
(
chan
LevelChanMsg
,
levelcastChanSize
)
go
check
()
// 检查长度
for
i
:=
0
;
i
<
kickChanSize
;
i
++
{
go
func
(
n
int
)
{
...
...
@@ -629,6 +704,11 @@ func main() {
areacast
(
n
)
}(
i
)
}
for
i
:=
0
;
i
<
levelcastChanSize
;
i
++
{
go
func
(
n
int
)
{
levelcast
(
n
)
}(
i
)
}
fmt
.
Println
(
"Go RPC listening on "
,
port
)
lis
,
err
:=
net
.
Listen
(
"tcp4"
,
":"
+
strconv
.
Itoa
(
port
))
...
...
@@ -677,6 +757,12 @@ func areacast(n int) {
}
}
func
levelcast
(
n
int
)
{
for
msg
:=
range
levelcastChan
{
realLevelcast
(
msg
.
ProxyAddr
,
msg
.
UserIds
,
msg
.
in
)
}
}
var
lastDingTime
time
.
Time
var
dingIntervalMin
float64
=
5
// 5min 告警间隔
...
...
@@ -687,11 +773,12 @@ func check() {
for
{
select
{
case
<-
tick
.
C
:
l
,
l2
,
l3
:=
len
(
kickChan
),
len
(
broadcastChan
),
len
(
area
castChan
)
if
l
>=
monitorLength
||
l2
>=
monitorLength
||
l3
>=
monitorLength
{
l
,
l2
,
l3
,
l4
:=
len
(
kickChan
),
len
(
broadcastChan
),
len
(
areacastChan
),
len
(
level
castChan
)
if
l
>=
monitorLength
||
l2
>=
monitorLength
||
l3
>=
monitorLength
||
l4
>=
monitorLength
{
if
time
.
Now
()
.
Sub
(
lastDingTime
)
.
Minutes
()
>
dingIntervalMin
{
go
func
()
{
if
sErr
:=
dingding
.
SendDingRobot
(
dingding
.
ROBOTWEBHOOK
,
fmt
.
Sprintf
(
"userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d"
,
l
,
l2
,
l3
),
true
);
sErr
!=
nil
{
if
sErr
:=
dingding
.
SendDingRobot
(
dingding
.
ROBOTWEBHOOK
,
fmt
.
Sprintf
(
"userCenter通知延迟,队列长度:kickChan:%d,broadcastChan:%d,areacastChan:%d,levelcastChan:%d"
,
l
,
l2
,
l3
,
l4
),
true
);
sErr
!=
nil
{
mylogrus
.
MyLog
.
Errorf
(
"dingding msg fail:%v"
,
sErr
)
}
else
{
lastDingTime
=
time
.
Now
()
...
...
manager/userManager.go
View file @
9d8cff5b
...
...
@@ -135,3 +135,32 @@ func (m *UserManager) GetAreaUsers(userIds []uint64, area int8) map[uint64]UserT
}
return
res
}
// 获取财富等级大于某等级的用户
// 开区间
func
(
m
*
UserManager
)
GetLevelUsers
(
userIds
[]
uint64
,
wealthLevel
int32
)
map
[
uint64
]
UserTinyArea
{
res
:=
make
(
map
[
uint64
]
UserTinyArea
)
// 从db中读,暂时不缓存(几千个)
var
users
[]
UserTinyArea
if
err
:=
m
.
MysqlDB
.
Table
(
"user"
)
.
Joins
(
"JOIN match_wealth_user_score ON match_wealth_user_score.user_id = user.id"
)
.
Select
(
"user.id,external_id,sex,code,country,avatar"
)
.
Where
(
"user.id IN (?)"
,
userIds
)
.
Where
(
"match_wealth_user_score.grade > ?"
,
wealthLevel
)
.
Find
(
&
users
)
.
Error
;
err
!=
nil
{
mylogrus
.
MyLog
.
Errorf
(
"GetLevelUsers fail:%v"
,
err
)
return
res
}
for
_
,
u
:=
range
users
{
a
:=
m
.
GetArea
(
u
.
Country
)
res
[
u
.
ID
]
=
UserTinyArea
{
ID
:
u
.
ID
,
ExternalId
:
u
.
ExternalId
,
Sex
:
u
.
Sex
,
Code
:
u
.
Code
,
Country
:
u
.
Country
,
Area
:
a
,
Avatar
:
u
.
Avatar
,
}
}
return
res
}
protocol/userCenter.proto
View file @
9d8cff5b
...
...
@@ -132,6 +132,16 @@ message AreaMessageRsp {
repeated
uint64
failedUids
=
1
;
}
message
LevelMessage
{
int32
level
=
1
;
uint32
msgType
=
2
;
bytes
payLoad
=
3
;
}
message
LevelMessageRsp
{
repeated
uint64
failedUids
=
1
;
}
service
Router
{
rpc
route
(
RouteMessage
)
returns
(
RouteMessageRsp
)
{}
rpc
kickUser
(
KickMessage
)
returns
(
KickMessageRsp
)
{}
...
...
@@ -143,6 +153,7 @@ service User {
rpc
multicast
(
MulticastMessage
)
returns
(
MulticastMessageRsp
)
{}
rpc
broadcast
(
BroadcastMessage
)
returns
(
BroadcastMessageRsp
)
{}
rpc
areacast
(
AreaMessage
)
returns
(
AreaMessageRsp
)
{}
rpc
levelcast
(
LevelMessage
)
returns
(
LevelMessageRsp
)
{}
rpc
transmit
(
BizMessage
)
returns
(
BizMessageRsp
)
{}
rpc
enterRoom
(
EnterRoomMessage
)
returns
(
EnterRoomMessageRsp
)
{}
rpc
leaveRoom
(
LeaveRoomMessage
)
returns
(
LeaveRoomMessageRsp
)
{}
...
...
protocol/userProxy.proto
View file @
9d8cff5b
...
...
@@ -86,7 +86,6 @@ message MatchConfirm {
uint32
remoteAgoraId
=
6
;
uint32
callDuration
=
7
;
uint32
localAgoraId
=
8
;
uint32
diamondBalance
=
9
;
string
matchUniqueId
=
10
;
uint32
failType
=
11
;
}
...
...
@@ -97,62 +96,6 @@ message CallReady {
uint64
endTimestamp
=
2
;
uint64
callDuration
=
3
;
string
channelId
=
4
;
uint64
remainDiamond
=
5
;
}
/* id == 103 礼物加时 */
message
AddTimeGift
{
uint32
giftId
=
1
;
string
token
=
2
;
uint32
duration
=
3
;
uint64
endTimestamp
=
4
;
string
channelId
=
5
;
bool
isSender
=
6
;
uint32
giftNum
=
7
;
string
iconUrl
=
8
;
string
svgaUrl
=
9
;
string
senderAvatar
=
10
;
string
receiverAvatar
=
11
;
}
/* id == 104 免费加时 */
message
AddTimeFree
{
string
token
=
1
;
uint32
duration
=
2
;
uint64
endTimestamp
=
3
;
string
channelId
=
4
;
uint32
senderAgoraId
=
5
;
}
/* id == 105 退出 */
message
ConnectsQuit
{
uint64
from_user_id
=
1
;
}
/* id == 106 连接状态 */
message
ConnectStatus
{
uint64
from_user_id
=
1
;
float
user_diamonds
=
2
;
bool
diamonds_enough
=
3
;
}
/* id == 107 ??? */
message
ConnectsCall
{
uint64
from_user_id
=
1
;
string
rong_room_name
=
2
;
bool
is_join
=
3
;
}
/* id == 108 */
message
ConnectCommon
{
string
rong_room_name
=
1
;
uint64
from_user_id
=
2
;
string
extra
=
3
;
string
message
=
4
;
}
/* id == 109 召回授权弹框 */
message
RecallWindow
{
}
/* id == 110 | 132 视频发送 status:(1:接收到邀请, 2:接收到对方同意, 3:双方拒绝(还没接通), 4:对方挂断(接通后)diamondBalance 只有status=2,才出现)*/
...
...
@@ -165,17 +108,19 @@ message Video {
string
sendUserId
=
6
;
string
receiveUserId
=
7
;
uint32
status
=
8
;
uint32
diamondBalance
=
9
;
User
sendUser
=
10
;
}
/* id == 109 召回授权弹框 */
message
RecallWindow
{
}
/* id == 111 视频通话准备 */
message
VideoCallReady
{
uint64
startTimestamp
=
1
;
uint64
endTimestamp
=
2
;
uint64
callDuration
=
3
;
string
channelId
=
4
;
uint64
remainDiamond
=
5
;
}
/* id == 112 互相喜欢 */
...
...
@@ -213,6 +158,7 @@ message GlobalGiftBanner {
uint32
bannerType
=
14
;
// 类型:0.普通礼物 1.cp直接送礼 2.cp告白礼物
uint32
cpLevel
=
15
;
// cp等级
string
receiveUserAvatar
=
16
;
uint32
nobleLevel
=
17
;
// 贵族等级
}
/* id == 116 横幅的回应,用来测量RTT */
...
...
@@ -289,6 +235,7 @@ message GlobalBroadcast {
string
msg
=
6
;
string
groupId
=
7
;
uint32
senderNobleLevel
=
8
;
bool
isPinned
=
9
;
}
/* id == 124 全球消息 */
...
...
@@ -322,13 +269,11 @@ message VideoTimeMinuteSuccess {
uint32
senderAgoraId
=
5
;
string
videoUniqueId
=
6
;
bool
isSend
=
7
;
uint32
sendRemainDiamond
=
8
;
}
/* id == 129 1对1视频1分钟加时询问检查 */
message
VideoTimeMinuteCheck
{
string
videoUniqueId
=
1
;
uint32
diamond
=
2
;
string
uuid
=
3
;
}
...
...
@@ -369,6 +314,7 @@ message GlobalGameBanner {
uint64
diamond
=
4
;
string
bannerUrl
=
5
;
uint64
gameId
=
6
;
// 1.ludo 2.uno 3.dice 4.lucky wheel 5.lucky box 6.fruit 7.slot
bool
isPink
=
7
;
// 是否粉钻
}
/* id == 147 羊羊匹配成功 */
...
...
@@ -377,6 +323,11 @@ message SheepMatchSuccess {
User
user
=
2
;
User
otherUser
=
3
;
uint64
game_id
=
4
;
string
channelId
=
5
;
string
token
=
6
;
uint32
agoraId
=
7
;
uint32
provider
=
8
;
uint32
otherAgoraId
=
9
;
}
message
SheepGamePlayer
{
...
...
@@ -463,4 +414,34 @@ message MicUserData {
string
micEffect
=
14
;
string
headwearIcon
=
15
;
Svip
svip
=
16
;
}
/* id == 157 游戏大厅匹配成功 */
message
LobbyMatchSuccess
{
uint64
game_id
=
1
;
uint64
mode
=
2
;
string
group_id
=
3
;
User
user
=
4
;
User
otherUser
=
5
;
string
gameCode
=
6
;
}
/* id == 158 H5游戏静音 */
message
H5GameVoiceMute
{
}
/* id == 159 H5游戏打开语音 */
message
H5GameVoiceUnMute
{
}
/* id == 160 退出房间 */
message
QuitRoom
{
uint32
reason
=
1
;
// 原因1.被拉黑;2.被踢出
string
group_id
=
2
;
}
/* id == 161 国家管理员横幅 */
message
GlobalCountryMgrBanner
{
string
country
=
1
;
// 国家
User
user
=
2
;
// 用户信息
}
\ No newline at end of file