consul.go 2.96 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
package consul

import (
	"fmt"
	"git.hilo.cn/hilo-common/mylogrus"
	"git.hilo.cn/hilo-common/utils"
	"github.com/hashicorp/consul/api"
	"net/http"
	"os"
	"time"
)

hujiebin's avatar
hujiebin committed
13 14 15
var (
	RegisterName = ""
	RegisterTag  = ""
16 17 18
)

// 异步注册到consul
hujiebin's avatar
hujiebin committed
19 20
func RegisterToConsul(port int, registerName, registerTag string) {
	RegisterName, RegisterTag = registerName, registerTag
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
	go register(port, false)
	go selfCheck(port)
}

func consulCheck(w http.ResponseWriter, r *http.Request) {
	_, _ = fmt.Fprintln(w, "consulCheck")
}

func register(port int, retry bool) {
	client, err := api.NewClient(api.DefaultConfig()) //非默认情况下需要设置实际的参数
	if err != nil {
		mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err)
		return
	}
	if client == nil {
		mylogrus.MyLog.Errorf("Fail to get consul client.")
		return
	}
	mylogrus.MyLog.Infof("RegisterToConsul:%v-%v", client, err)
	checkPort := port + 1000
	registration := new(api.AgentServiceRegistration)
	hostName, _ := os.Hostname()
	registration.ID = fmt.Sprintf("%s-%s", RegisterName, hostName)
	registration.Name = RegisterName
	registration.Port = port
	registration.Tags = []string{RegisterTag}

	myIp, myNodeName := "", ""
	if localIp, err := utils.GetClientIp(); err != nil {
		mylogrus.MyLog.Fatalln("local ip not found", err)
	} else {
		myIp = localIp
	}
	mylogrus.MyLog.Infof("My ip is %s, nodeName: %s\n", myIp, myNodeName)

	registration.Address = myIp
	registration.Check = &api.AgentServiceCheck{
		HTTP:                           fmt.Sprintf("http://localhost:%d%s", checkPort, "/check"),
		Timeout:                        "3s",
		Interval:                       "5s",
		DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务
	}
	err = client.Agent().ServiceRegister(registration)
	if err != nil {
		mylogrus.MyLog.Errorf("register server error :%v ", err)
		return
	}
	if !retry {
		http.HandleFunc("/check", consulCheck)
		if err = http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil); err != nil {
			mylogrus.MyLog.Warnf("check server error :%v ", err)
			return
		}
	}
}

// 自愈检查
// 启动后每一分钟检查一次
// 首次启动不执行
func selfCheck(port int) {
	ticker := time.NewTicker(time.Minute)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			client, err := api.NewClient(api.DefaultConfig()) //非默认情况下需要设置实际的参数
			if err != nil {
				mylogrus.MyLog.Errorf("RegisterToConsul Fail:%v", err)
				break
			}
			if client == nil {
				mylogrus.MyLog.Errorf("Fail to get consul client.")
				break
			}
			cataLog := client.Catalog()
			if cataLog == nil {
				mylogrus.MyLog.Errorf("No catalog.")
				break
			}
			services, _, err := cataLog.Service(RegisterName, "", nil)
			if err != nil {
				mylogrus.MyLog.Errorf("%v", err)
				break
			}
			if len(services) == 0 {
				mylogrus.MyLog.Errorf("%s not found.", RegisterName)
				go register(port, true) // 重新注册
			} else {
				mylogrus.MyLog.Infof("%s check success %v", RegisterName, services[0])
			}
		}
	}
}