diff --git a/main.go b/main.go index e279a55430164725e2d8455421742b8c82ebb509..53f4294c075808e75431bc5a791753c7f0842178 100644 --- a/main.go +++ b/main.go @@ -61,6 +61,7 @@ type BroadcastChanMsg struct { var ( userManager *manager.UserManager = nil termManager *manager.TerminalManager = nil + roomManager *manager.RoomManager = nil ) var logDir = "/var/log/hilo/" @@ -257,6 +258,43 @@ func (s *server) Transmit(ctx context.Context, in *userCenter.BizMessage) (*user return rsp, nil } +func (s *server) EnterRoom(ctx context.Context, in *userCenter.EnterRoomMessage) (*userCenter.EnterRoomMessageRsp, error) { + if err := roomManager.AddRoomUser(in.GetUid(), in.GetGroupId()); err != nil { + return nil, err + } + return &userCenter.EnterRoomMessageRsp{ + Status: 0, + }, nil +} + +func (s *server) LeaveRoom(ctx context.Context, in *userCenter.LeaveRoomMessage) (*userCenter.LeaveRoomMessageRsp, error) { + if err := roomManager.DelRoomUser(in.GetUid(), in.GetGroupId()); err != nil { + return nil, err + } + return &userCenter.LeaveRoomMessageRsp{ + Status: 0, + }, nil +} + +func (s *server) RoomHeartbeat(ctx context.Context, in *userCenter.RoomHeartbeatMessage) (*userCenter.RoomHeartbeatMessageRsp, error) { + if err := roomManager.UpdateRoomUser(in.GetUid(), in.GetGroupId()); err != nil { + return nil, err + } + return &userCenter.RoomHeartbeatMessageRsp{ + Status: 0, + }, nil +} + +func (s *server) GetLastRoomHeartbeat(ctx context.Context, in *userCenter.GetLastRoomHeartbeatMessage) (*userCenter.GetLastRoomHeartbeatMessageResp, error) { + ts, err := roomManager.GetLastRoomUserHeartbeat(in.GetUid(), in.GetGroupId()) + if err != nil { + return nil, err + } + return &userCenter.GetLastRoomHeartbeatMessageResp{ + Timestamp: ts, + }, nil +} + func realBroadcast(addr string, uids []uint64, msg *userCenter.BroadcastMessage) { mylogrus.MyLog.Infof("Broadcasting: Addr %s: users: %v", addr, uids) @@ -499,6 +537,10 @@ func main() { Ctx: context.Background(), RedisClient: rdb, } + roomManager = &manager.RoomManager{ + Ctx: context.Background(), + RedisClient: rdb, + } mylogrus.MyLog.Infof("Connected to Redis %s\n", redisAddress) diff --git a/manager/roomManager.go b/manager/roomManager.go new file mode 100644 index 0000000000000000000000000000000000000000..da21ab17254bd8472c0fbb0487ceb95d90711cbd --- /dev/null +++ b/manager/roomManager.go @@ -0,0 +1,79 @@ +package manager + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "hilo-userCenter/common/mylogrus" + "log" + "time" +) + +func getRoomUserKey(groupId string) string { + return fmt.Sprintf("room:%v", groupId) +} + +type RoomManager struct { + Ctx context.Context + RedisClient *redis.Client +} + +func (m *RoomManager) AddRoomUser(uid uint64, groupId string) error { + key := getRoomUserKey(groupId) + err := m.RedisClient.ZAdd(m.Ctx, key, &redis.Z{ + Score: float64(time.Now().Unix()), + Member: fmt.Sprintf("%d", uid), + }).Err() + if err != nil { + mylogrus.MyLog.Errorf("AddRoomUser fail:%v", err) + return nil + } + return err +} + +func (m *RoomManager) DelRoomUser(uid uint64, groupId string) error { + key := getRoomUserKey(groupId) + err := m.RedisClient.ZRem(m.Ctx, key, fmt.Sprintf("%d", uid)).Err() + if err != nil { + mylogrus.MyLog.Errorf("DelRoomUser fail:%v", err) + return nil + } + return err +} + +func (m *RoomManager) UpdateRoomUser(uid uint64, groupId string) error { + key := getRoomUserKey(groupId) + err := m.RedisClient.ZAdd(m.Ctx, key, &redis.Z{ + Score: float64(time.Now().Unix()), + Member: fmt.Sprintf("%d", uid), + }).Err() + if err != nil { + mylogrus.MyLog.Errorf("AddRoomUser fail:%v", err) + return nil + } + return err +} + +// 获取用户在房间的最后一次心跳 +func (m *RoomManager) GetLastRoomUserHeartbeat(uid uint64, groupId string) (int64, error) { + key := getRoomUserKey(groupId) + tx, err := m.RedisClient.ZScore(m.Ctx, key, fmt.Sprintf("%d", uid)).Result() + if err != nil && err != redis.Nil { + mylogrus.MyLog.Errorf("GetLastRoomUserHeartbeat fail:%v", err) + return 0, err + } + // redis nil means 0 + return int64(tx), nil +} + +func (m *RoomManager) GetAll() *map[string]string { + //ctx, _ := context.WithTimeout(m.Ctx, time.Millisecond*500) + //result := m.RedisClient.Get(ctx, field) + result, err := m.RedisClient.HGetAll(m.Ctx, user_key).Result() + if err != nil { + log.Printf("HGetAll error: %s\n", err.Error()) + return nil + } else { + return &result + } +} diff --git a/protocol/userCenter.proto b/protocol/userCenter.proto index 10535b37f523f5a71bbd43e81bddb2e5b2d338f2..d827270d8c6193829bfff50b036d7007417cf15c 100644 --- a/protocol/userCenter.proto +++ b/protocol/userCenter.proto @@ -85,6 +85,43 @@ message BizMessageRsp { uint32 status = 1; } +message EnterRoomMessage { + uint64 uid = 1; + string groupId = 2; +} + +message EnterRoomMessageRsp { + uint32 status = 1; +} + +message LeaveRoomMessage { + uint64 uid = 1; + string groupId = 2; +} + +message LeaveRoomMessageRsp { + uint32 status = 1; +} + +message RoomHeartbeatMessage { + uint64 uid = 1; + string groupId = 2; +} + +message RoomHeartbeatMessageRsp { + uint32 status = 1; +} + +// 获取房间内最后一次心跳 +message GetLastRoomHeartbeatMessage { + uint64 uid = 1; + string groupId = 2; +} + +message GetLastRoomHeartbeatMessageResp { + int64 timestamp = 1; +} + service Router { rpc route(RouteMessage) returns (RouteMessageRsp) {} rpc kickUser(KickMessage) returns (KickMessageRsp) {} @@ -96,4 +133,8 @@ service User { rpc multicast(MulticastMessage) returns (MulticastMessageRsp) {} rpc broadcast(BroadcastMessage) returns (BroadcastMessageRsp) {} rpc transmit(BizMessage) returns (BizMessageRsp) {} + rpc enterRoom(EnterRoomMessage) returns (EnterRoomMessageRsp) {} + rpc leaveRoom(LeaveRoomMessage) returns (LeaveRoomMessageRsp) {} + rpc roomHeartbeat(RoomHeartbeatMessage) returns (RoomHeartbeatMessageRsp) {} + rpc getLastRoomHeartbeat(GetLastRoomHeartbeatMessage) returns (GetLastRoomHeartbeatMessageResp) {} } \ No newline at end of file diff --git a/protocol/userProxy.proto b/protocol/userProxy.proto index 85e540e6ec8652e7067edd5db1d654e1eef13289..906b1d6d7006f76584df467f9e8e162481cc893d 100644 --- a/protocol/userProxy.proto +++ b/protocol/userProxy.proto @@ -411,4 +411,56 @@ message SvipUpgrade { User user = 1; uint32 svip_level = 2; string group_id = 3; +} + +/* id == 152 用户进房 上行 */ +message EnterRoom { + string group_id = 1; +} + +/* id == 153 用户离房 上行 */ +message LeaveRoom { + string group_id = 1; +} + +/* id == 154 房间心跳 上行 */ +message RoomHeartBeat { + string group_id = 1; +} + +/* id == 155 麦位变化 下行 */ +message GroupMicChange { + string seqId = 1; + string group_id = 2; + uint32 i = 3; + bool lock = 4; + bool forbid = 5; + bool micForbid = 6; + string externalId = 7; + uint32 agoraId = 8; + int64 timestamp = 9; + MicUserData user = 10; +} + +/* id == 156 麦位变化确认 上行 */ +message GroupMicChangeRsp { + string seqId = 1; +} + +message MicUserData { + uint64 id = 1; + string externalId = 2; + string avatar = 3; + string nick = 4; + uint32 sex = 5; + string code = 6; + bool isVip = 7; + uint32 noble = 8; + string headwearPicUrl = 9; + string headwearEffectUrl = 10; + string headwearReverseEffectUrl = 11; + uint32 svipLevel = 12; + string micEffect = 14; + string headwearIcon = 15; + Svip svip = 16; } \ No newline at end of file