package rpc import ( "context" "fmt" "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/protocol/userCenter" "git.hilo.cn/hilo-common/resource/config" "git.hilo.cn/hilo-common/resource/consul" consulapi "github.com/hashicorp/consul/api" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "time" ) const ( Port = 50060 ) var userClient userCenter.UserClient var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams } var ( defaultUserCenterAddr = "127.0.0.1:50040" // userCenter default addr userCenterAddr = defaultUserCenterAddr userCenterConsulName = "userCenter" ) // grpc服务发现 type Builder struct { addrs map[string][]string cc resolver.ClientConn } func (b *Builder) Scheme() string { return "uc" // userCenter } type Resolver struct { } func (r Resolver) ResolveNow(options resolver.ResolveNowOptions) {} func (r Resolver) Close() {} func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r := &Resolver{} paths := b.addrs[target.URL.Path] addrs := make([]resolver.Address, len(paths)) for i, s := range paths { addrs[i] = resolver.Address{Addr: s} } cc.UpdateState(resolver.State{Addresses: addrs}) b.cc = cc return r, nil } func (b *Builder) UpdateState(addrs []string) { as := make([]resolver.Address, len(addrs)) for i, s := range addrs { as[i] = resolver.Address{Addr: s} } b.cc.UpdateState(resolver.State{Addresses: as}) } func init() { client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数 mylogrus.MyLog.Infoln(client, err) if err != nil { mylogrus.MyLog.Fatalln(err) } if client == nil { mylogrus.MyLog.Fatalln("Fail to get consul client.") } // 本地环境下不需要userCenter if config.AppIsLocal() { mylogrus.MyLog.Infoln("userCenter is not required in local env.") return } // 服务发现 bd := &Builder{addrs: map[string][]string{"/api": {userCenterAddr}}} cataLog := client.Catalog() if cataLog == nil { mylogrus.MyLog.Fatalln("No catalog.") } services, _, err := cataLog.Service("userCenter", "", nil) if err != nil { mylogrus.MyLog.Fatalln(err) } if len(services) == 0 { mylogrus.MyLog.Fatalln("userCenter not found.") } var addrs []string for _, s := range services { addrs = append(addrs, fmt.Sprintf("%s:%d", s.ServiceAddress, s.ServicePort)) } if len(addrs) > 0 { bd = &Builder{addrs: map[string][]string{"/api": addrs}} userCenterAddr = "uc:///api" } mylogrus.MyLog.Infof("userCenterAddr:%v,addr:%v", userCenterAddr, addrs) resolver.Register(bd) go func() { address := consulapi.DefaultConfig().Address // 用consul api的default config if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if statusAddrs, ok := serviceStatus[userCenterConsulName]; ok { healthAddrs, _ := statusAddrs[consulapi.HealthPassing] l := len(healthAddrs) if l > 0 { mylogrus.MyLog.Infof("consul service update state:%v-%v", userCenterConsulName, healthAddrs) bd.UpdateState(healthAddrs) // 更新新的注册名 } else { mylogrus.MyLog.Warnf("consul service update local state:%v-%v", userCenterConsulName, defaultUserCenterAddr) bd.UpdateState([]string{defaultUserCenterAddr}) // 都没有健康的,使用默认本地回环的 } for status := range statusAddrs { if status == consulapi.HealthPassing { continue } mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", userCenterConsulName, status, statusAddrs[status]) } } }); err != nil { mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") } }() //userCenterAddr := services[0].Address + ":" + strconv.Itoa(services[0].ServicePort) //mylogrus.MyLog.Printf("Choose userCenter %s, %s, weights: %v\n", services[0].ID, userCenterAddr, services[0].ServiceWeights) // Set up a connection to the userCenter. conn, err := grpc.Dial(userCenterAddr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy":"%s"}`, "round_robin"))) if err != nil { mylogrus.MyLog.Fatalf("did not connect: %v", err) } //defer conn.Close() userClient = userCenter.NewUserClient(conn) if userClient == nil { mylogrus.MyLog.Fatalln("userClient null") } } func multicast(uids []uint64, msgType uint32, data []byte) ([]uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() rsp, err := userClient.Multicast(ctx, &userCenter.MulticastMessage{ Uids: uids, MsgType: msgType, PayLoad: data, }) if err != nil { mylogrus.MyLog.Errorf("Multicast message failed %s", err.Error()) } if rsp != nil { mylogrus.MyLog.Infof("Multicast message res:%+v", rsp) return rsp.FailedUids, err } else { return []uint64{}, err } } //广播 func broadcast(msgType uint32, data []byte) ([]uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() rsp, err := userClient.Broadcast(ctx, &userCenter.BroadcastMessage{ MsgType: msgType, PayLoad: data, }) if err != nil { mylogrus.MyLog.Errorf("broadcast message failed %s", err.Error()) } if rsp != nil { mylogrus.MyLog.Infof("broadcast message res:%v", rsp) return rsp.FailedUids, err } else { return []uint64{}, err } }