Commit 6cfd5ba2 authored by hujiebin's avatar hujiebin

feat:自己写Watch

parent 317e4c2b
...@@ -4,127 +4,40 @@ import ( ...@@ -4,127 +4,40 @@ import (
"fmt" "fmt"
"git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/mylogrus"
consulapi "github.com/hashicorp/consul/api" consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch" "time"
"sync"
) )
type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs func RegisterWatcher(serviceName string, cb func(addr []string)) {
ticker := time.NewTicker(time.Minute)
// 定义watcher defer ticker.Stop()
type Watcher struct { for {
Address string // consul agent 的地址:"127.0.0.1:8500" select {
Wp *watch.Plan // 总的Services变化对应的Plan case <-ticker.C:
watchers map[string]*watch.Plan // 对已经进行监控的service作个记录 client, err := consulapi.NewClient(consulapi.DefaultConfig()) //非默认情况下需要设置实际的参数
RWMutex *sync.RWMutex if err != nil {
} mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err)
continue
// 将consul新增的service加入,并监控
func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error {
// watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service
wp, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": serviceName,
})
if err != nil {
return err
}
// 定义service变化后所执行的程序(函数)handler
wp.Handler = func(idx uint64, data interface{}) {
switch d := data.(type) {
case []*consulapi.ServiceEntry:
var serviceStatus = make(map[string]map[string][]string)
for _, i := range d {
if data, ok := serviceStatus[i.Service.Service]; ok {
data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
} else {
serviceStatus[i.Service.Service] = make(map[string][]string)
serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()],
fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
}
} }
// 回传到外面 if client == nil {
callback(serviceStatus) mylogrus.MyLog.Errorf("Fail to get consul client.")
mylogrus.MyLog.Infof("consul service status: %s", serviceStatus) continue
}
}
// 启动监控
go wp.Run(w.Address)
// 对已启动监控的service作一个记录
w.RWMutex.Lock()
w.watchers[serviceName] = wp
w.RWMutex.Unlock()
return nil
}
func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) {
var options = map[string]interface{}{
"type": watchType,
}
// 组装请求参数。(监控类型不同,其请求参数不同)
for k, v := range opts {
options[k] = v
}
wp, err := watch.Parse(options)
if err != nil {
return nil, err
}
w := &Watcher{
Address: consulAddr,
Wp: wp,
watchers: make(map[string]*watch.Plan),
RWMutex: new(sync.RWMutex),
}
wp.Handler = func(idx uint64, data interface{}) {
switch d := data.(type) {
// 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go
// services
case map[string][]string:
for i := range d {
// 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串
// 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services
if _, ok := w.watchers[i]; ok || i == "consul" {
continue
}
w.registerServiceWatcher(i, callback)
} }
cataLog := client.Catalog()
// 从总的services变化中找到不再监控的service并停止 if cataLog == nil {
w.RWMutex.RLock() mylogrus.MyLog.Errorf("No catalog.")
watches := w.watchers continue
w.RWMutex.RUnlock() }
services, _, err := cataLog.Service(serviceName, "", nil)
// remove unknown services from watchers if err != nil {
for i, svc := range watches { mylogrus.MyLog.Errorf("%v", err)
if _, ok := d[i]; !ok { continue
svc.Stop() }
delete(watches, i) var addr []string
} for _, v := range services {
addr = append(addr, fmt.Sprintf("%s:%d", v.ServiceAddress, v.ServicePort))
} }
default: mylogrus.MyLog.Infof("%s check addr%v", serviceName, addr)
mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d) cb(addr)
} }
} }
return w, nil
}
func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error {
return nil
w, err := newWatcher(watchType, opts, consulAddr, callback)
if err != nil {
mylogrus.MyLog.Error(err)
return err
}
defer w.Wp.Stop()
if err = w.Wp.Run(consulAddr); err != nil {
mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err)
return err
}
return nil
} }
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/consul"
"git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/mysql"
"git.hilo.cn/hilo-common/utils" "git.hilo.cn/hilo-common/utils"
"github.com/hashicorp/consul/api"
"math/rand" "math/rand"
) )
...@@ -23,28 +22,11 @@ var activityServerHost = []string{defaultActivityServerAddr} ...@@ -23,28 +22,11 @@ var activityServerHost = []string{defaultActivityServerAddr}
func init() { func init() {
go func() { go func() {
address := api.DefaultConfig().Address // 用consul api的default config consul.RegisterWatcher(defaultActivityConsulName, func(addr []string) {
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if len(addr) > 0 {
if statusAddrs, ok := serviceStatus[defaultActivityConsulName]; ok { activityServerHost = addr
healthAddrs, _ := statusAddrs[api.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultActivityConsulName, healthAddrs)
activityServerHost = healthAddrs
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultActivityConsulName, defaultActivityServerAddr)
activityServerHost = []string{defaultActivityServerAddr} // 有其他问题都用默认的
}
for status := range statusAddrs {
if status == api.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultActivityConsulName, status, statusAddrs[status])
}
} }
}); err != nil { })
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}() }()
} }
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/consul"
"git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/mysql"
"github.com/hashicorp/consul/api"
"math/rand" "math/rand"
) )
...@@ -21,28 +20,11 @@ var financeServerHost = []string{defaultFinanceServerAddr} ...@@ -21,28 +20,11 @@ var financeServerHost = []string{defaultFinanceServerAddr}
func init() { func init() {
go func() { go func() {
address := api.DefaultConfig().Address // 用consul api的default config consul.RegisterWatcher(defaultFinanceConsulName, func(addr []string) {
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if len(addr) > 0 {
if statusAddrs, ok := serviceStatus[defaultFinanceConsulName]; ok { financeServerHost = addr
healthAddrs, _ := statusAddrs[api.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultFinanceConsulName, healthAddrs)
financeServerHost = healthAddrs
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultFinanceConsulName, defaultFinanceServerAddr)
financeServerHost = []string{defaultFinanceServerAddr} // 有其他问题都用默认的
}
for status := range statusAddrs {
if status == api.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultFinanceConsulName, status, statusAddrs[status])
}
} }
}); err != nil { })
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}() }()
} }
......
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/consul"
"git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/mysql"
"github.com/hashicorp/consul/api"
"math/rand" "math/rand"
) )
...@@ -22,28 +21,11 @@ var groupServerHost = []string{defaultGroupServerAddr} ...@@ -22,28 +21,11 @@ var groupServerHost = []string{defaultGroupServerAddr}
func init() { func init() {
go func() { go func() {
address := api.DefaultConfig().Address // 用consul api的default config consul.RegisterWatcher(defaultGroupConsulName, func(addr []string) {
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if len(addr) > 0 {
if statusAddrs, ok := serviceStatus[defaultGroupConsulName]; ok { groupServerHost = addr
healthAddrs, _ := statusAddrs[api.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultGroupConsulName, healthAddrs)
groupServerHost = healthAddrs
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultGroupConsulName, defaultGroupServerScheme)
groupServerHost = []string{defaultGroupServerAddr} // 有其他问题都用默认的
}
for status := range statusAddrs {
if status == api.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultGroupConsulName, status, statusAddrs[status])
}
} }
}); err != nil { })
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}() }()
} }
......
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"git.hilo.cn/hilo-common/mylogrus" "git.hilo.cn/hilo-common/mylogrus"
"git.hilo.cn/hilo-common/resource/consul" "git.hilo.cn/hilo-common/resource/consul"
"git.hilo.cn/hilo-common/resource/mysql" "git.hilo.cn/hilo-common/resource/mysql"
"github.com/hashicorp/consul/api"
"math/rand" "math/rand"
) )
...@@ -22,28 +21,11 @@ var UserServerHost = []string{defaultUserServerAddr} ...@@ -22,28 +21,11 @@ var UserServerHost = []string{defaultUserServerAddr}
func init() { func init() {
go func() { go func() {
address := api.DefaultConfig().Address // 用consul api的default config consul.RegisterWatcher(defaultUserConsulName, func(addr []string) {
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if len(addr) > 0 {
if statusAddrs, ok := serviceStatus[defaultUserConsulName]; ok { UserServerHost = addr
healthAddrs, _ := statusAddrs[api.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", defaultUserConsulName, healthAddrs)
UserServerHost = healthAddrs
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", defaultUserConsulName, defaultUserServerAddr)
UserServerHost = []string{defaultUserServerAddr} // 有其他问题都用默认的
}
for status := range statusAddrs {
if status == api.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", defaultUserConsulName, status, statusAddrs[status])
}
} }
}); err != nil { })
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}() }()
} }
......
...@@ -109,28 +109,11 @@ func init() { ...@@ -109,28 +109,11 @@ func init() {
mylogrus.MyLog.Infof("connect userCenterAddr:%v", userCenterAddr) mylogrus.MyLog.Infof("connect userCenterAddr:%v", userCenterAddr)
resolver.Register(bd) resolver.Register(bd)
go func() { go func() {
address := consulapi.DefaultConfig().Address // 用consul api的default config consul.RegisterWatcher(userCenterConsulName, func(addr []string) {
if err := consul.RegisterWatcher("services", nil, address, func(serviceStatus map[string]map[string][]string) { if len(addr) > 0 {
if statusAddrs, ok := serviceStatus[userCenterConsulName]; ok { bd.UpdateState(addr) // 更新新的注册名
healthAddrs, _ := statusAddrs[consulapi.HealthPassing]
l := len(healthAddrs)
if l > 0 {
mylogrus.MyLog.Infof("consul service update state:%v-%v", userCenterConsulName, healthAddrs)
bd.UpdateState(healthAddrs) // 更新新的注册名
} else {
mylogrus.MyLog.Warnf("consul service update local state:%v-%v", userCenterConsulName, defaultUserCenterAddr)
bd.UpdateState([]string{defaultUserCenterAddr}) // 都没有健康的,使用默认本地回环的
}
for status := range statusAddrs {
if status == consulapi.HealthPassing {
continue
}
mylogrus.MyLog.Warnf("consul service wrong state:%v-%v-%v", userCenterConsulName, status, statusAddrs[status])
}
} }
}); err != nil { })
mylogrus.MyLog.Errorf("启动 consul 的watch监控失败")
}
}() }()
//userCenterAddr := services[0].Address + ":" + strconv.Itoa(services[0].ServicePort) //userCenterAddr := services[0].Address + ":" + strconv.Itoa(services[0].ServicePort)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment