watch.go 3.9 KB
Newer Older
hujiebin's avatar
hujiebin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
package consul

import (
	"fmt"
	"git.hilo.cn/hilo-common/mylogrus"
	consulapi "github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/api/watch"
	"sync"
)

type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs

// 定义watcher
type Watcher struct {
	Address  string                 // consul agent 的地址:"127.0.0.1:8500"
	Wp       *watch.Plan            // 总的Services变化对应的Plan
	watchers map[string]*watch.Plan // 对已经进行监控的service作个记录
	RWMutex  *sync.RWMutex
}

// 将consul新增的service加入,并监控
func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error {
	// watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service
	wp, err := watch.Parse(map[string]interface{}{
		"type":    "service",
		"service": serviceName,
	})
	if err != nil {
		return err
	}

	// 定义service变化后所执行的程序(函数)handler
	wp.Handler = func(idx uint64, data interface{}) {
		switch d := data.(type) {
		case []*consulapi.ServiceEntry:
			var serviceStatus = make(map[string]map[string][]string)
			for _, i := range d {
				if data, ok := serviceStatus[i.Service.Service]; ok {
					data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
				} else {
					serviceStatus[i.Service.Service] = make(map[string][]string)
					serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()],
						fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
				}
			}
			// 回传到外面
			callback(serviceStatus)
			mylogrus.MyLog.Infof("consul service status: %s", serviceStatus)
		}
	}
	// 启动监控
	go wp.Run(w.Address)
	// 对已启动监控的service作一个记录
	w.RWMutex.Lock()
	w.watchers[serviceName] = wp
	w.RWMutex.Unlock()

	return nil
}

func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) {
	var options = map[string]interface{}{
		"type": watchType,
	}
	// 组装请求参数。(监控类型不同,其请求参数不同)
	for k, v := range opts {
		options[k] = v
	}

	wp, err := watch.Parse(options)
	if err != nil {
		return nil, err
	}

	w := &Watcher{
		Address:  consulAddr,
		Wp:       wp,
		watchers: make(map[string]*watch.Plan),
		RWMutex:  new(sync.RWMutex),
	}

	wp.Handler = func(idx uint64, data interface{}) {
		switch d := data.(type) {
		// 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go
		// services
		case map[string][]string:
			for i := range d {
				// 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串
				// 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services
				if _, ok := w.watchers[i]; ok || i == "consul" {
					continue
				}
				w.registerServiceWatcher(i, callback)
			}

			// 从总的services变化中找到不再监控的service并停止
			w.RWMutex.RLock()
			watches := w.watchers
			w.RWMutex.RUnlock()

			// remove unknown services from watchers
			for i, svc := range watches {
				if _, ok := d[i]; !ok {
					svc.Stop()
					delete(watches, i)
				}
			}
		default:
			mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d)
		}
	}

	return w, nil
}

func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error {
hujiebin's avatar
hujiebin committed
117
	return nil
hujiebin's avatar
hujiebin committed
118 119 120 121 122 123 124 125 126 127 128 129 130
	w, err := newWatcher(watchType, opts, consulAddr, callback)
	if err != nil {
		mylogrus.MyLog.Error(err)
		return err
	}
	defer w.Wp.Stop()
	if err = w.Wp.Run(consulAddr); err != nil {
		mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err)
		return err
	}

	return nil
}