package main import ( "context" "fmt" "github.com/asim/go-micro/plugins/client/grpc/v4" "github.com/asim/go-micro/plugins/registry/consul/v4" "github.com/gin-gonic/gin" "github.com/panjf2000/ants/v2" "github.com/spf13/cobra" "go-micro.dev/v4" "go-micro.dev/v4/client" "go-micro.dev/v4/registry" "go-micro.dev/v4/selector" "net/http" "os" "sync" "time" ) var ( port string // http监听端口 consulAddr []string // consul注册中心地址 microRegistry registry.Registry // 注册中心实例 microClient client.Client // 客户端实例 pool *ants.Pool // 协程池, 并发查询GetService用 ) // http响应对象 type response struct { Message string `json:"message"` Result interface{} `json:"result"` } // 超时控制 type timeoutWrapper struct { client.Client } func NewTimeoutWrapper(c client.Client) client.Client { return &timeoutWrapper{c} } func (t *timeoutWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // 2秒超时 timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2) defer cancel() return t.Client.Call(timeoutCtx, req, rsp, opts...) } func main() { pool, _ = ants.NewPool(16) // 协程池, 并发查询GetService用 defer pool.Release() cmd := &cobra.Command{ Use: "go run gateway.go或go build -o .", Short: "go-micro开发调试网关 转发http->grpc, 例如: post请求 /service.demo/Demo.GetDemo", Run: func(cmd *cobra.Command, args []string) { start() }, } // 解析命令行参数 cmd.Flags().StringVarP(&port, "port", "p", "8100", "监听端口") cmd.Flags().StringSliceVarP(&consulAddr, "consul", "c", []string{}, "consul注册中心地址,多个则逗号隔开") cmd.MarkFlagRequired("consul") err := cmd.Execute() if err != nil { os.Exit(0) } } func start() { microRegistry = consul.NewRegistry(registry.Addrs(consulAddr...)) // 轮询访问 newSelector := selector.NewSelector( selector.Registry(microRegistry), selector.SetStrategy(selector.RoundRobin), ) service := micro.NewService( micro.Client(grpc.NewClient()), micro.Selector(newSelector), micro.WrapClient(NewTimeoutWrapper), ) microClient = service.Client() // gin http服务 r := gin.Default() r.GET("/list", listService) r.POST("/:service/:endpoint", callService) r.Run(fmt.Sprintf(":%s", port)) } // 列出注册的服务 func listService(ctx *gin.Context) { srvs, err := microRegistry.ListServices() if err != nil { ctx.JSON(http.StatusOK, &response{Message: err.Error()}) return } // 组装响应信息 type serviceInfo struct { Name string `json:"name"` Endpoints []string `json:"endpoints"` Nodes []*registry.Node `json:"nodes"` } resultChan := make(chan *serviceInfo, 100) wg := &sync.WaitGroup{} var services = make(map[string][]*serviceInfo) for _, srv := range srvs { wg.Add(1) tempName := srv.Name pool.Submit(func() { defer wg.Done() srvDetail, _ := microRegistry.GetService(tempName) for _, v := range srvDetail { s := &serviceInfo{ Name: tempName, Nodes: v.Nodes, } e := make([]string, 0) for _, ee := range v.Endpoints { e = append(e, ee.Name) } s.Endpoints = e resultChan <- s } }) } go func() { wg.Wait() close(resultChan) }() for v := range resultChan { services[v.Name] = append(services[v.Name], v) } ctx.JSON(http.StatusOK, &response{Message: "ok", Result: services}) } // 调用服务 func callService(ctx *gin.Context) { var requestData map[string]interface{} err := ctx.ShouldBindJSON(&requestData) if err != nil { ctx.JSON(http.StatusOK, &response{Message: fmt.Sprintf("获取body参数失败: %s", err)}) return } //req := cli.NewRequest("service.demo", "Demo.GetDemo", requestData, mClient.WithContentType("application/json")) req := microClient.NewRequest(ctx.Param("service"), ctx.Param("endpoint"), requestData, client.WithContentType("application/json")) var res map[string]interface{} err = microClient.Call(context.Background(), req, &res) if err != nil { ctx.JSON(http.StatusOK, &response{ Message: "grpc响应异常", Result: err, }) return } ctx.JSON(http.StatusOK, &response{ Message: "ok", Result: res, }) }