本文是个人在学习书籍《Go 语言高并发与微服务实战》第10章内容过程中,动手做的小实验,涉及 Hystrix 相关知识,仅供参考。
服务熔断机制
三种状态
┌─────────────────────────────────────────────────────────────┐
│ 熔断器状态机 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ 失败率超过阈值 ┌──────────┐ │
│ │ CLOSED │ ───────────────────────▶│ OPEN │ │
│ │ 关闭 │ │ 打开 │ │
│ └──────────┘ └────┬─────┘ │
│ ▲ │ │
│ │ │ 超时后 │
│ │ 检测成功 │ 半开 │
│ │ ▼ │
│ │ ┌──────────┐ │
│ └──────────────────────────────│HALF-OPEN │ │
│ 恢复关闭 │ 半开 │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
| 状态 | 说明 |
|---|
| Closed(关闭) | 正常状态,请求正常通过;失败达到阈值时切换到 Open |
| Open(打开) | 熔断状态,所有请求直接失败,快速返回 |
| Half-Open(半开) | 尝试恢复状态,放行部分请求;成功则关闭,失败则重新打开 |
Hystrix 配置参数
| 参数 | 默认值 | 说明 |
|---|
sleepWindow | 5000ms | 熔断打开后多久尝试半开 |
requestVolumeThreshold | 20 | 熔断器打开的最小请求数 |
errorPercentThreshold | 50% | 错误百分比阈值 |
maxConcurrentRequests | 10 | 最大并发请求数 |
负载均衡算法
常见算法
| 算法 | 说明 | 适用场景 |
|---|
| 随机 | 随机选择服务实例 | 请求较少、无状态服务 |
| 轮询(Round Robin) | 依次选择每个实例 | 请求均匀、实例性能相近 |
| 加权轮询 | 按权重比例分配请求 | 实例性能不均 |
| Hash | 根据请求特征 Hash | 会话保持、缓存友好 |
| 一致性 Hash | 环状 Hash 分布 | 分布式缓存 |
| 最小连接数 | 选择连接数最少的 | 长连接服务 |
一致性 Hash 原理
┌─────────────┐
│ 节点 A │◀── Hash(A)
┌───────────│ 192.168.1.1│
│ └─────────────┘
│ ▲
│ │ 顺时针找
Hash环 │ 最近的节点
│ │
▼ │
┌───────────────┐ │
│ 节点 C │───────────┘
│ 192.168.1.3 │◀── Hash(C)
└───────────────┘
▲
│
│
┌───────────────┐
│ 节点 B │
│ 192.168.1.2 │◀── Hash(B)
└───────────────┘
Hystrix Go 使用
安装
go get github.com/afex/hystrix-go/hystrix
基本用法
package main
import (
"fmt"
"time"
"github.com/afex/hystrix-go/hystrix"
)
func main() {
// 配置 Command
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: 1000, // 超时时间 ms
MaxConcurrentRequests: 10, // 最大并发数
SleepWindow: 5000, // 熔断后尝试恢复的时间
RequestVolumeThreshold: 20, // 熔断器打开的最小请求数
ErrorPercentThreshold: 50, // 错误百分比阈值
})
// 使用熔断器执行
err := hystrix.Do("my_command", func() error {
// 这里是实际要执行的业务逻辑
return callService()
}, func(err error) error {
// 这里是降级处理
fmt.Println("服务调用失败,执行降级逻辑")
return fallback()
})
if err != nil {
fmt.Println("请求失败:", err)
}
}
func callService() error {
// 模拟服务调用
time.Sleep(100 * time.Millisecond)
return nil
}
func fallback() error {
// 降级逻辑
fmt.Println("执行降级返回")
return nil
}
异步调用
// 返回 chan
resultChan := make(chan string, 1)
errorChan := hystrix.Go("my_command", func() error {
// 异步执行
go func() {
result, _ := callServiceAsync()
resultChan <- result
}()
return nil
}, nil)
// 等待结果
select {
case result := <-resultChan:
fmt.Println("结果:", result)
case <-time.After(time.Second * 5):
fmt.Println("超时")
}
监控
import "github.com/afex/hystrix-go/hystrix/metric_collector"
func init() {
// 开启监控
hystrix.RegisterReporter("console", &HystrixConsoleReporter{})
}
完整示例:微服务调用
服务端(string-service)
// service/main.go
package main
import (
"flag"
"fmt"
"log"
"net/http"
"time"
)
var port = flag.Int("port", 10086, "service port")
func main() {
flag.Parse()
http.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
msg := r.URL.Query().Get("msg")
fmt.Printf("收到请求: %s\n", msg)
time.Sleep(time.Millisecond * 100)
w.Write([]byte("响应: " + msg))
})
log.Printf("服务启动,端口: %d", *port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}
客户端(带熔断)
// client/main.go
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
"github.com/afex/hystrix-go/hystrix"
)
var (
servicePort = flag.Int("port", 10086, "service port")
serviceHost = flag.String("host", "localhost", "service host")
)
func main() {
flag.Parse()
// 配置熔断器
hystrix.ConfigureCommand("string_service", hystrix.CommandConfig{
Timeout: 1000,
MaxConcurrentRequests: 10,
SleepWindow: 5000,
RequestVolumeThreshold: 5,
ErrorPercentThreshold: 40,
})
// 模拟多次请求
for i := 0; i < 20; i++ {
err := hystrix.Do("string_service", func() error {
return callService(fmt.Sprintf("请求 %d", i))
}, func(err error) error {
fmt.Printf("降级处理: %v\n", err)
return nil
})
if err != nil {
fmt.Printf("第 %d 次请求失败: %v\n", i, err)
}
time.Sleep(200 * time.Millisecond)
}
}
func callService(msg string) error {
url := fmt.Sprintf("http://%s:%d/echo?msg=%s",
*serviceHost, *servicePort, strings.TrimSpace(msg))
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
log.Printf("响应: %s", string(body))
return nil
}
运行示例
# 1. 启动 Consul(服务发现)
consul agent -dev
# 2. 启动多个服务实例
go run service/main.go -port 10086
go run service/main.go -port 10089
# 3. 启动客户端
go run client/main.go
# 4. 人为制造失败(例如停掉下游服务)观察熔断打开与降级日志,再恢复服务确认 Half-Open 恢复
熔断与负载均衡往往一起出现:负载均衡决定请求打到哪台实例,熔断在实例不健康时快速失败并触发降级,避免线程与连接池被拖垮。结合服务发现(Consul / K8s Endpoints)可在实例上下线时自动更新负载均衡目标。