diff --git a/resource/consul/watch.go b/resource/consul/watch.go index 5c30899c463cbdc54f9d6dad8cc08d69c59ea837..e84e33356eaad7e0db0dd7f8bbee709f1ee57822 100644 --- a/resource/consul/watch.go +++ b/resource/consul/watch.go @@ -4,127 +4,40 @@ import ( "fmt" "git.hilo.cn/hilo-common/mylogrus" consulapi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/api/watch" - "sync" + "time" ) -type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs - -// 定义watcher -type Watcher struct { - Address string // consul agent 的地址:"127.0.0.1:8500" - Wp *watch.Plan // 总的Services变化对应的Plan - watchers map[string]*watch.Plan // 对已经进行监控的service作个记录 - RWMutex *sync.RWMutex -} - -// 将consul新增的service加入,并监控 -func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error { - // watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service - wp, err := watch.Parse(map[string]interface{}{ - "type": "service", - "service": serviceName, - }) - if err != nil { - return err - } - - // 定义service变化后所执行的程序(函数)handler - wp.Handler = func(idx uint64, data interface{}) { - switch d := data.(type) { - case []*consulapi.ServiceEntry: - var serviceStatus = make(map[string]map[string][]string) - for _, i := range d { - if data, ok := serviceStatus[i.Service.Service]; ok { - data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port)) - } else { - serviceStatus[i.Service.Service] = make(map[string][]string) - serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()], - fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port)) - } +func RegisterWatcher(serviceName string, cb func(addr []string)) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数 + if err != nil { + mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err) + continue } - // 回传到外面 - callback(serviceStatus) - mylogrus.MyLog.Infof("consul service status: %s", serviceStatus) - } - } - // 启动监控 - go wp.Run(w.Address) - // 对已启动监控的service作一个记录 - w.RWMutex.Lock() - w.watchers[serviceName] = wp - w.RWMutex.Unlock() - - return nil -} - -func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) { - var options = map[string]interface{}{ - "type": watchType, - } - // 组装请求参数。(监控类型不同,其请求参数不同) - for k, v := range opts { - options[k] = v - } - - wp, err := watch.Parse(options) - if err != nil { - return nil, err - } - - w := &Watcher{ - Address: consulAddr, - Wp: wp, - watchers: make(map[string]*watch.Plan), - RWMutex: new(sync.RWMutex), - } - - wp.Handler = func(idx uint64, data interface{}) { - switch d := data.(type) { - // 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go - // services - case map[string][]string: - for i := range d { - // 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串 - // 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services - if _, ok := w.watchers[i]; ok || i == "consul" { - continue - } - w.registerServiceWatcher(i, callback) + if client == nil { + mylogrus.MyLog.Errorf("Fail to get consul client.") + continue } - - // 从总的services变化中找到不再监控的service并停止 - w.RWMutex.RLock() - watches := w.watchers - w.RWMutex.RUnlock() - - // remove unknown services from watchers - for i, svc := range watches { - if _, ok := d[i]; !ok { - svc.Stop() - delete(watches, i) - } + cataLog := client.Catalog() + if cataLog == nil { + mylogrus.MyLog.Errorf("No catalog.") + continue + } + services, _, err := cataLog.Service(serviceName, "", nil) + if err != nil { + mylogrus.MyLog.Errorf("%v", err) + continue + } + var addr []string + for _, v := range services { + addr = append(addr, fmt.Sprintf("%s:%d", v.ServiceAddress, v.ServicePort)) } - default: - mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d) + mylogrus.MyLog.Infof("%s check addr%v", serviceName, addr) + cb(addr) } } - - return w, nil -} - -func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error { - return nil - w, err := newWatcher(watchType, opts, consulAddr, callback) - if err != nil { - mylogrus.MyLog.Error(err) - return err - } - defer w.Wp.Stop() - if err = w.Wp.Run(consulAddr); err != nil { - mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err) - return err - } - - return nil } diff --git a/rpc/activity.go b/rpc/activity.go index 5487c9dd821bdbc6cae86f3a585c1a796b8d39f0..f538db145356dfc8c5331ab1f9566349e64ba976 100644 --- a/rpc/activity.go +++ b/rpc/activity.go @@ -9,7 +9,6 @@ import ( "git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/utils" - "github.com/hashicorp/consul/api" "math/rand" ) @@ -23,28 +22,11 @@ var activityServerHost = []string{defaultActivityServerAddr} func init() { go func() { - address := api.DefaultConfig().Address // 用consul api的default config - if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { - if statusAddrs, ok := serviceStatus[defaultActivityConsulName]; ok { - healthAddrs, _ := statusAddrs[api.HealthPassing] - l := len(healthAddrs) - if l > 0 { - mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultActivityConsulName, healthAddrs) - activityServerHost = healthAddrs - } else { - mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultActivityConsulName, defaultActivityServerAddr) - activityServerHost = []string{defaultActivityServerAddr} // 有其他问题都用默认的 - } - for status := range statusAddrs { - if status == api.HealthPassing { - continue - } - mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultActivityConsulName, status, statusAddrs[status]) - } + consul.RegisterWatcher(defaultActivityConsulName, func(addr []string) { + if len(addr) > 0 { + activityServerHost = addr } - }); err != nil { - mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") - } + }) }() } diff --git a/rpc/finance.go b/rpc/finance.go index f2f923e3e01c613a9da18bb09b63ede7f89bfac7..564e36cfd9cc1247c93c60a55ad5c11940fcd37b 100644 --- a/rpc/finance.go +++ b/rpc/finance.go @@ -7,7 +7,6 @@ import ( "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/mysql" - "github.com/hashicorp/consul/api" "math/rand" ) @@ -21,28 +20,11 @@ var financeServerHost = []string{defaultFinanceServerAddr} func init() { go func() { - address := api.DefaultConfig().Address // 用consul api的default config - if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { - if statusAddrs, ok := serviceStatus[defaultFinanceConsulName]; ok { - healthAddrs, _ := statusAddrs[api.HealthPassing] - l := len(healthAddrs) - if l > 0 { - mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultFinanceConsulName, healthAddrs) - financeServerHost = healthAddrs - } else { - mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultFinanceConsulName, defaultFinanceServerAddr) - financeServerHost = []string{defaultFinanceServerAddr} // 有其他问题都用默认的 - } - for status := range statusAddrs { - if status == api.HealthPassing { - continue - } - mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultFinanceConsulName, status, statusAddrs[status]) - } + consul.RegisterWatcher(defaultFinanceConsulName, func(addr []string) { + if len(addr) > 0 { + financeServerHost = addr } - }); err != nil { - mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") - } + }) }() } diff --git a/rpc/group.go b/rpc/group.go index b43cf380a5526f13eaab80bd2ebb0bd4a7373ca9..8b7997a2e29a09747760ebcf9ebdb64fcf20f284 100644 --- a/rpc/group.go +++ b/rpc/group.go @@ -8,7 +8,6 @@ import ( "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/mysql" - "github.com/hashicorp/consul/api" "math/rand" ) @@ -22,28 +21,11 @@ var groupServerHost = []string{defaultGroupServerAddr} func init() { go func() { - address := api.DefaultConfig().Address // 用consul api的default config - if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { - if statusAddrs, ok := serviceStatus[defaultGroupConsulName]; ok { - healthAddrs, _ := statusAddrs[api.HealthPassing] - l := len(healthAddrs) - if l > 0 { - mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultGroupConsulName, healthAddrs) - groupServerHost = healthAddrs - } else { - mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultGroupConsulName, defaultGroupServerScheme) - groupServerHost = []string{defaultGroupServerAddr} // 有其他问题都用默认的 - } - for status := range statusAddrs { - if status == api.HealthPassing { - continue - } - mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultGroupConsulName, status, statusAddrs[status]) - } + consul.RegisterWatcher(defaultGroupConsulName, func(addr []string) { + if len(addr) > 0 { + groupServerHost = addr } - }); err != nil { - mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") - } + }) }() } diff --git a/rpc/user.go b/rpc/user.go index cccc9781f8e25fcfecc73e13b9a3e315fab5c154..8a5f18ec061f0e1a5c459cae4d48b5c81dee365a 100644 --- a/rpc/user.go +++ b/rpc/user.go @@ -8,7 +8,6 @@ import ( "git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/mysql" - "github.com/hashicorp/consul/api" "math/rand" ) @@ -22,28 +21,11 @@ var UserServerHost = []string{defaultUserServerAddr} func init() { go func() { - address := api.DefaultConfig().Address // 用consul api的default config - if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { - if statusAddrs, ok := serviceStatus[defaultUserConsulName]; ok { - healthAddrs, _ := statusAddrs[api.HealthPassing] - l := len(healthAddrs) - if l > 0 { - mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultUserConsulName, healthAddrs) - UserServerHost = healthAddrs - } else { - mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultUserConsulName, defaultUserServerAddr) - UserServerHost = []string{defaultUserServerAddr} // 有其他问题都用默认的 - } - for status := range statusAddrs { - if status == api.HealthPassing { - continue - } - mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultUserConsulName, status, statusAddrs[status]) - } + consul.RegisterWatcher(defaultUserConsulName, func(addr []string) { + if len(addr) > 0 { + UserServerHost = addr } - }); err != nil { - mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") - } + }) }() } diff --git a/rpc/user_center.go b/rpc/user_center.go index 118f31236f0773f67c8b93c988329996e18c725b..e680dae65de3687528b12721421be7ff2ee7b5d2 100644 --- a/rpc/user_center.go +++ b/rpc/user_center.go @@ -109,28 +109,11 @@ func init() { mylogrus.MyLog.Infof("connect userCenterAddr:%v", userCenterAddr) resolver.Register(bd) go func() { - address := consulapi.DefaultConfig().Address // 用consul api的default config - if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { - if statusAddrs, ok := serviceStatus[userCenterConsulName]; ok { - healthAddrs, _ := statusAddrs[consulapi.HealthPassing] - l := len(healthAddrs) - if l > 0 { - mylogrus.MyLog.Infof("consul service update state:%v-%v", userCenterConsulName, healthAddrs) - bd.UpdateState(healthAddrs) // 更新新的注册名 - } else { - mylogrus.MyLog.Warnf("consul service update local state:%v-%v", userCenterConsulName, defaultUserCenterAddr) - bd.UpdateState([]string{defaultUserCenterAddr}) // 都没有健康的,使用默认本地回环的 - } - for status := range statusAddrs { - if status == consulapi.HealthPassing { - continue - } - mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", userCenterConsulName, status, statusAddrs[status]) - } + consul.RegisterWatcher(userCenterConsulName, func(addr []string) { + if len(addr) > 0 { + bd.UpdateState(addr) // 更新新的注册名 } - }); err != nil { - mylogrus.MyLog.Errorf("启动 consul 的watch监控失败") - } + }) }() //userCenterAddr := services[0].Address + ":" + strconv.Itoa(services[0].ServicePort)