From a8f8734918dbe7e524874d0d4e7e8c6c12857ae1 Mon Sep 17 00:00:00 2001 From: hujiebin Date: Thu, 28 Mar 2024 16:46:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:broadcast=20=E6=9C=89=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 88a4dd1..513bcc8 100644 --- a/main.go +++ b/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "errors" "flag" "fmt" + "github.com/golang/protobuf/proto" "gorm.io/gorm/schema" "net" "net/url" @@ -196,7 +198,7 @@ func (s *server) Multicast(ctx context.Context, in *userCenter.MulticastMessage) return &userCenter.MulticastMessageRsp{FailedUids: failed}, nil } -func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { +func (s *server) BroadcastOld(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { //mylogrus.MyLog.Infof("Broadcasting msgType = %d, size = %d\n", in.MsgType, len(in.PayLoad)) failed := []uint64{} @@ -249,6 +251,32 @@ func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) return &userCenter.BroadcastMessageRsp{FailedUids: failed}, nil } +func (s *server) Broadcast(ctx context.Context, in *userCenter.BroadcastMessage) (*userCenter.BroadcastMessageRsp, error) { + redisKey := "service:userSocket" + ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{ + Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳 + Max: "+inf", + }).Result() + if err != nil { + failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userSocket", err) + mylogrus.MyLog.Errorf(failMsg) + _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true) + return nil, err + } + if len(ipPorts) <= 0 { + failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userSocket", err) + mylogrus.MyLog.Errorf(failMsg) + _ = dingding.SendDingRobot(dingding.ROBOTWEBHOOK, failMsg, true) + return nil, errors.New(failMsg) + } + data, _ := proto.Marshal(in) + for _, ip := range ipPorts { + queue := "broadcast:" + ip + rdbCluster.RPush(context.Background(), queue, data) + } + return &userCenter.BroadcastMessageRsp{FailedUids: nil}, nil +} + func (s *server) Areacast(ctx context.Context, in *userCenter.AreaMessage) (*userCenter.AreaMessageRsp, error) { var failed []uint64 terminals := termManager.GetAll() @@ -558,11 +586,13 @@ type HiloConfigs struct { Value string } +var rdbCluster *redis.Client + func main() { flag.Parse() // init redis cluster - rdbCluster := redis.NewClient(&redis.Options{ + rdbCluster = redis.NewClient(&redis.Options{ Addr: appConfig.GetConfigRedis().REDIS_CLUSTER_HOST, Password: appConfig.GetConfigRedis().REDIS_CLUSTER_PASSWORD, }) -- 2.22.0