Commit 89c2baf1 authored by hujiebin's avatar hujiebin

feat:调整consul入参

parent cc7dca1e
......@@ -10,13 +10,14 @@ import (
"time"
)
const (
RegisterName = "hiloFinance"
RegisterTag = "金融中心"
var (
RegisterName = ""
RegisterTag = ""
)
// 异步注册到consul
func RegisterToConsul(port int) {
func RegisterToConsul(port int, registerName, registerTag string) {
RegisterName, RegisterTag = registerName, registerTag
go register(port, false)
go selfCheck(port)
}
......
package consul
import (
"fmt"
"git.hilo.cn/hilo-common/mylogrus"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"sync"
)
type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs
// 定义watcher
type Watcher struct {
Address string // consul agent 的地址:"127.0.0.1:8500"
Wp *watch.Plan // 总的Services变化对应的Plan
watchers map[string]*watch.Plan // 对已经进行监控的service作个记录
RWMutex *sync.RWMutex
}
// 将consul新增的service加入,并监控
func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error {
// watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service
wp, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": serviceName,
})
if err != nil {
return err
}
// 定义service变化后所执行的程序(函数)handler
wp.Handler = func(idx uint64, data interface{}) {
switch d := data.(type) {
case []*consulapi.ServiceEntry:
var serviceStatus = make(map[string]map[string][]string)
for _, i := range d {
if data, ok := serviceStatus[i.Service.Service]; ok {
data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
} else {
serviceStatus[i.Service.Service] = make(map[string][]string)
serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()],
fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
}
}
// 回传到外面
callback(serviceStatus)
mylogrus.MyLog.Infof("consul service status: %s", serviceStatus)
}
}
// 启动监控
go wp.Run(w.Address)
// 对已启动监控的service作一个记录
w.RWMutex.Lock()
w.watchers[serviceName] = wp
w.RWMutex.Unlock()
return nil
}
func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) {
var options = map[string]interface{}{
"type": watchType,
}
// 组装请求参数。(监控类型不同,其请求参数不同)
for k, v := range opts {
options[k] = v
}
wp, err := watch.Parse(options)
if err != nil {
return nil, err
}
w := &Watcher{
Address: consulAddr,
Wp: wp,
watchers: make(map[string]*watch.Plan),
RWMutex: new(sync.RWMutex),
}
wp.Handler = func(idx uint64, data interface{}) {
switch d := data.(type) {
// 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go
// services
case map[string][]string:
for i := range d {
// 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串
// 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services
if _, ok := w.watchers[i]; ok || i == "consul" {
continue
}
w.registerServiceWatcher(i, callback)
}
// 从总的services变化中找到不再监控的service并停止
w.RWMutex.RLock()
watches := w.watchers
w.RWMutex.RUnlock()
// remove unknown services from watchers
for i, svc := range watches {
if _, ok := d[i]; !ok {
svc.Stop()
delete(watches, i)
}
}
default:
mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d)
}
}
return w, nil
}
func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error {
w, err := newWatcher(watchType, opts, consulAddr, callback)
if err != nil {
mylogrus.MyLog.Error(err)
return err
}
defer w.Wp.Stop()
if err = w.Wp.Run(consulAddr); err != nil {
mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err)
return err
}
return nil
}
package utils
import (
"git.hilo.cn/hilo-common/mylogrus"
"runtime/debug"
)
func CheckGoPanic() {
if r := recover(); r != nil {
//打印错误堆栈信息
mylogrus.MyLog.Errorf("ACTION PANIC: %v, stack: %v", r, string(debug.Stack()))
}
}
package utils
import (
"git.hilo.cn/hilo-common/resource/config"
"strings"
)
const DefaultAvatarMan = "hilo/manager/ea48b62d54a24a709de3c38702c89995.png"
// 补全url,区分处理oss和aws两种情况
func MakeFullUrl(url string) string {
if strings.HasPrefix(url, config.GetConfigOss().OSS_CDN) || strings.HasPrefix(url, config.GetConfigAws().AWS_CDN) {
return url
} else if strings.HasPrefix(url, "nextvideo/") {
return config.GetConfigOss().OSS_CDN + url
} else if strings.HasPrefix(url, config.GetConfigAws().AWS_DIR) {
return config.GetConfigAws().AWS_CDN + url
} else {
return url
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment