diff --git a/resource/consul/consul.go b/resource/consul/consul.go index a2a720c5a31aef1dbbc634815e57fb3120f0ac83..29409b1d5b65577877bfc313b082d3a30a9add9c 100644 --- a/resource/consul/consul.go +++ b/resource/consul/consul.go @@ -10,13 +10,14 @@ import ( "time" ) -const ( - RegisterName = "hiloFinance" - RegisterTag = "金融中心" +var ( + RegisterName = "" + RegisterTag = "" ) // 异步注册到consul -func RegisterToConsul(port int) { +func RegisterToConsul(port int, registerName, registerTag string) { + RegisterName, RegisterTag = registerName, registerTag go register(port, false) go selfCheck(port) } diff --git a/resource/consul/watch.go b/resource/consul/watch.go new file mode 100644 index 0000000000000000000000000000000000000000..070b092cce9c57567102aebaa10f1b6d12be3a44 --- /dev/null +++ b/resource/consul/watch.go @@ -0,0 +1,129 @@ +package consul + +import ( + "fmt" + "git.hilo.cn/hilo-common/mylogrus" + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "sync" +) + +type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs + +// 定义watcher +type Watcher struct { + Address string // consul agent 的地址:"127.0.0.1:8500" + Wp *watch.Plan // 总的Services变化对应的Plan + watchers map[string]*watch.Plan // 对已经进行监控的service作个记录 + RWMutex *sync.RWMutex +} + +// 将consul新增的service加入,并监控 +func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error { + // watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service + wp, err := watch.Parse(map[string]interface{}{ + "type": "service", + "service": serviceName, + }) + if err != nil { + return err + } + + // 定义service变化后所执行的程序(函数)handler + wp.Handler = func(idx uint64, data interface{}) { + switch d := data.(type) { + case []*consulapi.ServiceEntry: + var serviceStatus = make(map[string]map[string][]string) + for _, i := range d { + if data, ok := serviceStatus[i.Service.Service]; ok { + data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port)) + } else { + serviceStatus[i.Service.Service] = make(map[string][]string) + serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()], + fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port)) + } + } + // 回传到外面 + callback(serviceStatus) + mylogrus.MyLog.Infof("consul service status: %s", serviceStatus) + } + } + // 启动监控 + go wp.Run(w.Address) + // 对已启动监控的service作一个记录 + w.RWMutex.Lock() + w.watchers[serviceName] = wp + w.RWMutex.Unlock() + + return nil +} + +func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) { + var options = map[string]interface{}{ + "type": watchType, + } + // 组装请求参数。(监控类型不同,其请求参数不同) + for k, v := range opts { + options[k] = v + } + + wp, err := watch.Parse(options) + if err != nil { + return nil, err + } + + w := &Watcher{ + Address: consulAddr, + Wp: wp, + watchers: make(map[string]*watch.Plan), + RWMutex: new(sync.RWMutex), + } + + wp.Handler = func(idx uint64, data interface{}) { + switch d := data.(type) { + // 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go + // services + case map[string][]string: + for i := range d { + // 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串 + // 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services + if _, ok := w.watchers[i]; ok || i == "consul" { + continue + } + w.registerServiceWatcher(i, callback) + } + + // 从总的services变化中找到不再监控的service并停止 + w.RWMutex.RLock() + watches := w.watchers + w.RWMutex.RUnlock() + + // remove unknown services from watchers + for i, svc := range watches { + if _, ok := d[i]; !ok { + svc.Stop() + delete(watches, i) + } + } + default: + mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d) + } + } + + return w, nil +} + +func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error { + w, err := newWatcher(watchType, opts, consulAddr, callback) + if err != nil { + mylogrus.MyLog.Error(err) + return err + } + defer w.Wp.Stop() + if err = w.Wp.Run(consulAddr); err != nil { + mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err) + return err + } + + return nil +} diff --git a/utils/panic.go b/utils/panic.go new file mode 100644 index 0000000000000000000000000000000000000000..25dc82ad56003a0652e000869b7ca3dd08d399bf --- /dev/null +++ b/utils/panic.go @@ -0,0 +1,13 @@ +package utils + +import ( + "git.hilo.cn/hilo-common/mylogrus" + "runtime/debug" +) + +func CheckGoPanic() { + if r := recover(); r != nil { + //打印错误堆栈信息 + mylogrus.MyLog.Errorf("ACTION PANIC: %v, stack: %v", r, string(debug.Stack())) + } +} diff --git a/utils/url.go b/utils/url.go new file mode 100644 index 0000000000000000000000000000000000000000..2f14d60abca5365f0a68ed2d0400b315e2336264 --- /dev/null +++ b/utils/url.go @@ -0,0 +1,21 @@ +package utils + +import ( + "git.hilo.cn/hilo-common/resource/config" + "strings" +) + +const DefaultAvatarMan = "hilo/manager/ea48b62d54a24a709de3c38702c89995.png" + +// 补全url,区分处理oss和aws两种情况 +func MakeFullUrl(url string) string { + if strings.HasPrefix(url, config.GetConfigOss().OSS_CDN) || strings.HasPrefix(url, config.GetConfigAws().AWS_CDN) { + return url + } else if strings.HasPrefix(url, "nextvideo/") { + return config.GetConfigOss().OSS_CDN + url + } else if strings.HasPrefix(url, config.GetConfigAws().AWS_DIR) { + return config.GetConfigAws().AWS_CDN + url + } else { + return url + } +}