watch.go 3.89 KB
Newer Older
hujiebin's avatar
hujiebin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
package consul

import (
	"fmt"
	"git.hilo.cn/hilo-common/mylogrus"
	consulapi "github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/api/watch"
	"sync"
)

type ServiceCallback func(serviceStatus map[string]map[string][]string) // service->status->addrs

// 定义watcher
type Watcher struct {
	Address  string                 // consul agent 的地址:"127.0.0.1:8500"
	Wp       *watch.Plan            // 总的Services变化对应的Plan
	watchers map[string]*watch.Plan // 对已经进行监控的service作个记录
	RWMutex  *sync.RWMutex
}

// 将consul新增的service加入,并监控
func (w *Watcher) registerServiceWatcher(serviceName string, callback ServiceCallback) error {
	// watch endpoint 的请求参数,具体见官方文档:https://www.consul.io/docs/dynamic-app-config/watches#service
	wp, err := watch.Parse(map[string]interface{}{
		"type":    "service",
		"service": serviceName,
	})
	if err != nil {
		return err
	}

	// 定义service变化后所执行的程序(函数)handler
	wp.Handler = func(idx uint64, data interface{}) {
		switch d := data.(type) {
		case []*consulapi.ServiceEntry:
			var serviceStatus = make(map[string]map[string][]string)
			for _, i := range d {
				if data, ok := serviceStatus[i.Service.Service]; ok {
					data[i.Checks.AggregatedStatus()] = append(data[i.Checks.AggregatedStatus()], fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
				} else {
					serviceStatus[i.Service.Service] = make(map[string][]string)
					serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()] = append(serviceStatus[i.Service.Service][i.Checks.AggregatedStatus()],
						fmt.Sprintf("%s:%d", i.Service.Address, i.Service.Port))
				}
			}
			// 回传到外面
			callback(serviceStatus)
			mylogrus.MyLog.Infof("consul service status: %s", serviceStatus)
		}
	}
	// 启动监控
	go wp.Run(w.Address)
	// 对已启动监控的service作一个记录
	w.RWMutex.Lock()
	w.watchers[serviceName] = wp
	w.RWMutex.Unlock()

	return nil
}

func newWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) (*Watcher, error) {
	var options = map[string]interface{}{
		"type": watchType,
	}
	// 组装请求参数。(监控类型不同,其请求参数不同)
	for k, v := range opts {
		options[k] = v
	}

	wp, err := watch.Parse(options)
	if err != nil {
		return nil, err
	}

	w := &Watcher{
		Address:  consulAddr,
		Wp:       wp,
		watchers: make(map[string]*watch.Plan),
		RWMutex:  new(sync.RWMutex),
	}

	wp.Handler = func(idx uint64, data interface{}) {
		switch d := data.(type) {
		// 这里只实现了对services的监控,其他监控的data类型判断参考:https://github.com/dmcsorley/avast/blob/master/consul.go
		// services
		case map[string][]string:
			for i := range d {
				// 如果该service已经加入到ConsulRegistry的services里监控了,就不再加入 或者i 为 "consul"的字符串
				// 为什么会多一个consul,参考官方文档services监听的返回值:https://www.consul.io/docs/dynamic-app-config/watches#services
				if _, ok := w.watchers[i]; ok || i == "consul" {
					continue
				}
				w.registerServiceWatcher(i, callback)
			}

			// 从总的services变化中找到不再监控的service并停止
			w.RWMutex.RLock()
			watches := w.watchers
			w.RWMutex.RUnlock()

			// remove unknown services from watchers
			for i, svc := range watches {
				if _, ok := d[i]; !ok {
					svc.Stop()
					delete(watches, i)
				}
			}
		default:
			mylogrus.MyLog.Errorf("不能判断监控的数据类型: %v", &d)
		}
	}

	return w, nil
}

func RegisterWatcher(watchType string, opts map[string]string, consulAddr string, callback ServiceCallback) error {
	w, err := newWatcher(watchType, opts, consulAddr, callback)
	if err != nil {
		mylogrus.MyLog.Error(err)
		return err
	}
	defer w.Wp.Stop()
	if err = w.Wp.Run(consulAddr); err != nil {
		mylogrus.MyLog.Errorf("RegisterWatcher err: %v", err)
		return err
	}

	return nil
}