本文是个人在学习书籍《Go 语言高并发与微服务实战》第6章内容过程中,动手做的小实验,涉及 Consul 和 go-kit 相关知识。
Consul 简介
┌─────────────────────────────────────────────────────────────┐
│ Consul 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Consul │ │ Consul │ │ Consul │ │
│ │ Server │ │ Server │ │ Server │ │
│ │ (Leader)│ │(Follow) │ │(Follow) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ └───────────┴───────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ ┌────▼────┐ ┌────▼────┐ │
│ │ Consul │ │ Consul │ │
│ │ Agent │ │ Agent │ │
│ │ (Node1) │ │ (Node2) │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ┌────▼────┐ ┌────▼────┐ │
│ │ Service │ │ Service │ │
│ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
安装 Consul
下载安装
# macOS
brew install consul
# Linux
curl -fsSL https://apt.releases.hashicorp.com/gpg | sudo apt-key add -
sudo apt-add-repository "deb [arch=amd64] https://apt.releases.hashicorp.com $(lsb_release -cs) main"
sudo apt-get update && sudo apt-get install consul
# Docker
docker run -d --name=consul consul agent -dev -client=0.0.0.0
验证安装
consul --version
# Consul v1.10.0
Consul 基本操作
单机启动
# 开发模式启动
consul agent -dev
# 访问 UI
# http://127.0.0.1:8500/ui
集群启动
# 机器 1:主节点
consul agent -server \n -bootstrap-expect=1 \n -data-dir=/tmp/consul \n -node=consul-server-1 \n -bind=10.0.0.52 \n -ui
# 机器 2:加入集群
consul agent \n -data-dir=/tmp/consul \n -node=consul-server-2 \n -bind=10.0.0.53
# 在机器 2 执行加入
consul join 10.0.0.52
注册服务
# 注册服务(通过 HTTP API)
curl -X PUT -d '{
"ID": "user-service-1",
"Name": "user-service",
"Tags": ["v1", "go"],
"Address": "127.0.0.1",
"Port": 8080,
"Check": {
"HTTP": "http://127.0.0.1:8080/health",
"Interval": "10s",
"Timeout": "5s"
}
}' http://127.0.0.1:8500/v1/agent/service/register
# 注销服务
curl -X PUT http://127.0.0.1:8500/v1/agent/service/deregister/user-service-1
服务发现
# 查找服务
curl http://127.0.0.1:8500/v1/catalog/service/user-service
# DNS 查询
# user-service.service.consul
# 健康检查
curl http://127.0.0.1:8500/v1/health/service/user-service?passing=true
Go 微服务注册发现
项目结构
consul-demo/
├── main.go # 主程序
├── user/ # 用户服务
│ └── user.go
├── discovery/ # 服务发现封装
│ └── consul.go
└── go.mod
安装依赖
go get github.com/go-kit/kit/sd/consul
go get github.com/hashicorp/consul
Consul 客户端封装
// discovery/consul.go
package discovery
import (
"fmt"
"time"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/consul"
"github.com/go-kit/kit/sd/lazy"
"github.com/go-kit/kit/endpoint"
"github.com/hashicorp/consul/api"
)
type Client struct {
client consul.Client
instancer *consul.Instancer
}
func NewConsulClient(consulAddr string) (*Client, error) {
// 创建 Consul 客户端
config := api.DefaultConfig()
config.Address = consulAddr
consulClient, err := api.NewClient(config)
if err != nil {
return nil, err
}
// 创建服务注册实例化工具
client, err := consul.NewClient(consulClient)
if err != nil {
return nil, err
}
return &Client{client: client}, nil
}
// 注册服务
func (c *Client) Register(serviceName, serviceID string, addr string, port int) error {
// 健康检查
check := api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", addr, port),
Interval: "10s",
Timeout: "5s",
Notes: "Health check for " + serviceName,
}
// 服务注册信息
registration := api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Tags: []string{"v1"},
Address: addr,
Port: port,
EnableTagOverride: false,
Check: &check,
}
return c.client.Register(®istration)
}
// 注销服务
func (c *Client) Deregister(serviceName, serviceID string) error {
return c.client.Deregister(&api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
})
}
// 创建服务实例化工具
func (c *Client) GetInstancer(serviceName string) sd.Instancer {
return consul.NewInstancer(c.client, nil, serviceName, []string{}, true)
}
// 工厂函数创建 endpoint
func instanceFactory(serviceURL string) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
// 调用远程服务
return nil, nil
}
}
// 创建端点订阅器
func (c *Client) GetEndpointer(serviceName string) sd.Endpointer {
instancer := c.GetInstancer(serviceName)
return lazy.NewEndpointer(instancer, instanceFactory, nil)
}
服务端实现
// user/server.go(演示:注册到 Consul + 提供 /health)
package main
import (
"context"
"encoding/json"
"fmt"
stdlog "log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
kitlog "github.com/go-kit/kit/log"
"github.com/hashicorp/consul/api"
)
const (
ServiceName = "user-service"
ServicePort = 8080
)
type UserService struct{}
func (s *UserService) GetUser(ctx context.Context, id int64) (*User, error) {
return &User{
ID: id,
Name: fmt.Sprintf("User-%d", id),
Age: 20 + int(id%30),
}, nil
}
type User struct {
ID int64 `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
func pickAdvertiseIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "127.0.0.1"
}
for _, a := range addrs {
ipnet, ok := a.(*net.IPNet)
if !ok || ipnet.IP.IsLoopback() {
continue
}
if v4 := ipnet.IP.To4(); v4 != nil {
return v4.String()
}
}
return "127.0.0.1"
}
func main() {
logger := kitlog.NewLogfmtLogger(os.Stderr)
logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC)
cfg := api.DefaultConfig()
cfg.Address = "127.0.0.1:8500"
client, err := api.NewClient(cfg)
if err != nil {
stdlog.Fatalf("consul client: %v", err)
}
addr := pickAdvertiseIP()
serviceID := fmt.Sprintf("%s-%d", ServiceName, ServicePort)
reg := &api.AgentServiceRegistration{
ID: serviceID,
Name: ServiceName,
Address: addr,
Port: ServicePort,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", addr, ServicePort),
Interval: "10s",
},
}
if err := client.Agent().ServiceRegister(reg); err != nil {
stdlog.Fatalf("register: %v", err)
}
svc := &UserService{}
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/user/", func(w http.ResponseWriter, r *http.Request) {
u, err := svc.GetUser(r.Context(), 1)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(u)
})
srv := &http.Server{
Addr: fmt.Sprintf(":%d", ServicePort),
Handler: mux,
}
go func() {
_ = logger.Log("msg", "listening", "port", ServicePort)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
stdlog.Fatal(err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(ctx)
_ = client.Agent().ServiceDeregister(serviceID)
}
进程退出前注销服务,可避免 Consul 里留下僵尸实例;生产环境还需配置正确的 健康检查 URL、ACL Token 与 多实例 ID。