第10章 微服务的容错处理与负载均衡

思考并回答以下问题:

随着微服务的规模逐渐增长,各个微服务之间可能会存在错综复杂的调用关系;除此之外,整个微服务系统也可能对外部系统发起远程调用。鉴于网络的不可靠性和系统的运行意外,在微服务中发起的远程调用可能会得到失败的结果,比如网络连接缓慢或超时、提供服务方逻辑错误或者已经过载不可用等;同时为了应对更大的请求压力,各个微服务一般是多实例部署,如果各实例之间的负载不合理,就无法发挥服务器横向扩展的优势,提高系统的吞吐量。

在请求失败后,健壮性服务一般会采用重试机制来发起请求,对于大多数的暂时性故障,比如网络短时停顿等,重试机制都能够得到良好的调用结果。但是对于一些长时间不可用的微服务持续性远程调用,重试机制可能会使得情况变得更加不堪,对此我们就需要服务熔断的机制,为微服务之间的调用提供强大的容错能力,保护服务调用方的服务稳定性。而对于多实例部署的微服务体系,我们需要使用合理的负载均衡策略,将请求合理地分配到各个服务实例中,保证集群中大多数服务器的负载保持在高效稳定的状态,提高系统的处理能力。

在本章中,我们首先介绍服务熔断和负载均衡的相关原理,然后实现自定义的负载均衡器和借助Hystrix实现服务熔断器,并将它们应用到use-string-service服务对string-service服务的远程调用中,最后对Hystrix进行详解和实践在服务网关中添加服务熔断与负载均衡功能。

服务熔断

熔断的概念最先来自于电路工程中,在我们的家庭电路中,在电表和电路的火线上会接有一根保险丝为电路安全护航。保险丝一般由熔点较低的金属制成,当电路上的电流过大时,它就会因为过热被熔断,从而达到保护电表和电路的作用。

在微服务架构中,服务之间的调用一般分为服务调用方和服务提供方。当下游服务因为过载或者故障不能用时,我们需要及时在上游的服务调用方暂时“熔断”调用方和提供方之间的调用链,避免服务雪崩现象的出现,从而保证服务调用方与系统整体的稳定性和可用性。

分布式系统中的服务雪崩

在分布式系统中,由于业务上的划分,一次完整的请求可能需要不同服务协作完成,在微服务架构中就是多个服务实例协作完成。请求会在这些服务实例中传递,服务之间的调用会产生新的请求,它们共同组成一条服务调用链,关系如下(图10-1)时序图所示。

图10-1 微服务调用链

客户端发起了一次请求Request1,网关在接受到请求后将它转发给service-A,由于这次请求涉及到了service-B中的数据,所以service-A又向service-B发起了一次请求Request1-1来获取对应的数据,在处理结束后将结果返回给网关,由网关将结果返回给客户端。上图10-1中Request1和Request1-1共同组成了这次调用的调用链。

服务雪崩是指当调用链的某个环节(特别是服务提供方的服务)不可用时,导致了上游环节不可用,并最终将这种影响像雪崩一样扩大到整个系统中,导致了整个系统不可用的情况。

服务雪崩的发生流程如下图10-2所示。

图10-2 服务雪崩的发生流程

服务雪崩一般有3个阶段:

(1)第一阶段是服务提供者不能用。

在初始阶段,一切运行良好,网关、service-A和service-B响应着客户端的各种请求。在某一个时间节点,服务提供者service-B由于网络故障或者请求过载而不可用,无法及时响应各类请求。

(2)第二阶段是服务调用者不可用。

在服务提供者不可用之后,客户端可能会因为错误提示或者长时间的阻塞而不断发送相同的请求到网关中,网关再次将请求转发给service-A进行处理,service-A根据业务流程也会向service-B发起数据请求;同时上一阶段中service-A对service-B超时或者失败的请求可能会因为service-A中重试机制再次请求service-B。这些请求都无法从service-B中获取到有效的返回,最坏的结果是都被阻塞,无法及时响应。service-A也会因为发起了过多对service-B的请求而产生的等待线程耗尽了线程池中的资源,无法及时响应其他请求,导致了自身的不可用。

(3)最后阶段是整个系统的不可用。

service-A中等待请求同样阻塞了转发请求的网关。网关也因为大量等待请求将会产生大量的阻塞线程,使得网关没有足够的资源处理其他请求,导致了整个系统无法对外提供服务。

服务熔断保障系统可用性

为了避免服务雪崩现象的出现,我们需要及时“壮士断腕”,在必要的时候暂时切断对异常服务提供者的调用,保证部分服务的可用以及整体系统的稳定性。服务熔断机制如图10-3所示。

图10-3 服务熔断机制

如图10-3,我们在service-A向service-B的请求中增加了一根“保险丝”,即断路器。它会统计一段时间内service-A对service-B请求响应结果,在超时或者失败次数过多的情况下,阻断service-A对service-B的请求,直接返回相关的异常处理结果,使得service-A中的请求线程能够及时返回,避免资源耗尽而不可用,从而保护了服务调用者,避免了服务级联失败。

断路器

断路器能够很好地保护服务调用方的稳定性,它能够避免服务调用者频繁执行可能失败的服务提供者,防止服务调用者浪费CPU使用周期和线程资源。

断路器设计模式借鉴了电路中的保险丝设计方案。断路器代理了服务调用方对提供方的请求,它监控了最近请求的失败和超时次数。在下游服务因为过载或者故障无法提供正常响应时,断路器中的请求失败率就会大大提升,在超过一定阀值之后,断路器会打开,切断服务调用者和服务提供者之间的联系,此时服务调用者会执行失败逻辑或者返回异常,避免无效的线程等待。同时断路器中还提供检测恢复机制,允许服务调用者尝试调用服务提供者以检测其是否恢复正常,若正常则关闭断路器,恢复正常调用。

断路器中存在三种状态,分别是关闭、打开、半开,它们之间的状态转化如图10-4所示。

图10-4 断路器状态转化

  • 关闭状态:如果程序正常运行,那么断路器大多数时候都处于这个状态,此时服务调用者正常调用服务提供者。断路器会统计周期时间内的请求总次数和失败次数的比例。
  • 打开状态:如果最近失败频率超过预设的阀值之后,断路器就会进入打开的状态。服务调用者对服务提供者的调用将会立即失败,转而执行预设的失败逻辑或者返回异常。
  • 半开状态:断路器进入打开状态之后将启动一个超时定时器,在定时器到达时,它会进入到半开状态。此时断路器允许服务调用者尝试对服务提供者发起少量实际调用请求(检测恢复机制)。如果这些请求都成功执行,那么断路器就认为服务提供者已经恢复正常,进入关闭状态,失败计数器也同时复位。如果这些请求失败,断路器将返回到打开状态,并重新启动超时定数器,重复进行检测恢复。

关闭状态使用的失败计数器基于时间窗口计数,它会定期自动复位。只有在窗口时间内发生的请求总次数和请求失败次数达到一定的阀值,断路器才会被打开。半打开状态使用成功计数器记录调用操作的成功尝试次数,在指定数量的连续操作调用成功后,断路器恢复到关闭状态。如果任何调用失败,断路器会立即进入断开状态,成功计数器将在下次进入半开状态时重新清零。半开状态仅允许有限的请求发生真正的调用,这有助于防止刚恢复的服务提供者突然被请求淹没而再次宕机。

负载均衡

负载均衡能够将大量的请求,根据负载均衡算法,分发到多台服务器上进行处理,使得所有服务器负载都维持在高效稳定的状态,以提高系统的吞吐量,保证可用性。

负载均衡类型

负载均衡分为软件负载均衡和硬件负载均衡。软件负载均衡一般使用独立的负载均衡软件来实现请求的分发,它配置简单,使用成本低,能够满足基本的负载均衡要求,但是负载均衡软件的质量和所部署服务器的性能就有可能成为系统吞吐量的瓶颈;硬件负载均衡依赖于特殊的负载均衡设备,部署成本高,但相对于软件负载均衡,能够满足更多样化的需求。

基于DNS负载均衡和反向代理负载均衡是我们常见的软件负载均衡。在DNS服务器中,会为同一个名称配置多个不同的IP地址,不同的DNS请求会解析到不同的IP地址,从而达到不同请求访问不同服务器的目的;而反向代理负载均衡使用代理服务器,将请求按照一定的规则分发到下游的服务器集群进行处理,最常见的方式即服务网关。

负载均衡算法

负载均衡算法定义了如何将请求分散到服务实例的规则,优秀的负载均衡算法能够有效提高系统的吞吐量,使服务集群中各服务的负载处于高效稳定的状态。常见的负载均衡算法有以下几种:

(1)随机法

随机从服务集群中选取一台服务分配请求。随机法实现简单明了,保证了请求的分散性,但是无法顾及请求分配是否合理和服务器的负载能力。

(2)轮询法或者加权轮询法

将请求轮流分配给现有服务集群中的每一台服务,适用于服务集群中各服务负载能力相当且请求处理差异不大的情况下。加权轮询会根据各服务的权重,额外分配更多的请求,例如服务A权重1,服务器B权重2,服务器C权重3,则分配的过程为A-B-B-C-C-C-A-B-B-C-C-C······

(3)Hash法或者一致性Hash法

使用Hash算法将请求分散到各个服务中。一致性Hash则基于虚拟节点,在某一个服务节点宕机后将请求平摊到其他服务节点,避免请求的剧烈变动。

(4)最小连接数法

将请求分配到当前服务集群中处理请求最少的服务中。该算法需要负载均衡服务器和服务之间存在信息交互,负载均衡服务器需要了解集群中各个服务的负载情况。

实践案例:服务熔断和负载均衡使用

Hystrix是Netflix开源的一款优秀的服务间断路器,它能够在服务提供者出现故障时,隔离调用者和提供者,防止服务级联失败;同时提供失败回滚逻辑,使系统快速从异常中恢复。Hystrix完美地实现了断路器模式,同时还提供信号量和线程隔离的方式以保护服务调用者的线程资源,它对延迟和失败提供了强大的容错能力,为系统提供保护和控制。接下来我们将通过一个简单的实例项目来了解hystrix-go的使用方式。本例子的完整代码位于ch10-resiliency文件夹下。

这个实例中,我们使用两个简单的Web项目string-service和use-string-service来演示在服务调用中如何使用服务熔断和负载均衡。项目结构按照Go-kit的transport-endpoint-service的方式进行组织。其中string-service是我们基本的演示项目,use-string-service将通过HTTP的方式调用string-service提供的字符串操作服务,并在调用的过程中使用Hystrix进行访问保护和负载均衡器进行调用分发。项目的调用逻辑图如下图10-5所示。

图10-5 调用逻辑图

负载均衡器

在进行项目编写之前,我们首先在common/loadbalance包下定义一个负载均衡算法的接口方法,它接收一组服务实例列表,然后根据具体的负载均衡算法选择特定的被调用服务实例信息返回。源码位于common/loadbalance/loadbalance.go下,代码如下所示:

1
2
3
4
// 负载均衡器
type LoadBalance interface {
SelectService(service []*api.AgentService) (*api.AgentService, error)
}

为了接下来的代码演示,我们还实现了一个随机法的负载均衡器RandomLoadBalance,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var ErrNoInstances = errors.New("service instances are not existed")

type RandomLoadBalance struct {
}

// 随机负载均衡
func (loadBalance *RandomLoadBalance) SelectService(services []*api.AgentService) (*api.AgentService, error) {

if services == nil || len(services) == 0 {
return nil, ErrNoInstances
}

return services[rand.Intn(len(services))], nil
}

服务编写

string-service服务即我们的基本演示项目,代码位于ch10-resiliency/string-service中,我们在6.6小节中已经详细介绍过该项目的搭建。string-service服务将对外提供两个HTTP接口:/health接口用于进行健康检查;/op/{type}/{a}/{b}接口对外提供字符串操作。

use-string-service服务作为服务调用方,会通过HTTP的方式调用string-service服务提供的/op/{type}/{a}/{b}接口。use-string-service服务使用Go-kit的项目结构进行组织,详细代码请求参阅ch10-resiliency/use-string-service目录。接下来我们将按照service层,endpoint层,transport层的顺序依次讲解use-string-service服务的构建。

1,use-string-service的service层

在use-string-service的service层中,我们定义了以下两个接口方法,源码位于ch10-resiliency/use-string-service/service/service.go,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package service

import (
"encoding/json"
"errors"
"github.com/afex/hystrix-go/hystrix"
"github.com/hashicorp/consul/api"
"micro-go-book/ch10-resiliency/use-string-service/config"
"micro-go-book/common/discover"
"micro-go-book/common/loadbalance"
"net/http"
"net/url"
"strconv"
)

const (
StringServiceCommandName = "String.string"
StringService = "string"
)

var (
ErrHystrixFallbackExecute = errors.New("Hystrixfall back execute")
)

// Service Define a service interface
type Service interface {
// 远程调用 string-service 服务
UseStringService(operationType, a, b string) (string, error)

// 健康检查
HealthCheck() bool
}

// ArithmeticService implement Service interface
type UseStringService struct {
// 服务发现客户端
discoveryClient discover.DiscoveryClient // 如何拿到服务的地址?
loadbalance loadbalance.LoadBalance // 多实例使用哪一个?怎么决定?
}

func NewUseStringService(client discover.DiscoveryClient, lb loadbalance.LoadBalance) Service {

hystrix.ConfigureCommand(StringServiceCommandName, hystrix.CommandConfig{
// 设置触发最低请求阀值为 5,方便我们观察结果
RequestVolumeThreshold: 5,
})
return &UseStringService{
discoveryClient: client,
loadbalance: lb,
}
}

// StringResponse define response struct
type StringResponse struct {
Result string `json:"result"`
Error error `json:"error"`
}

func (s UseStringService) UseStringService(operationType, a, b string) (string, error) {

var operationResult string

err := hystrix.Do(StringServiceCommandName, func() error {
instances := s.discoveryClient.DiscoverServices(StringService, config.Logger)
// 随机选取一个服务实例进行计算
instanceList := make([]*api.AgentService, len(instances))
for i := 0; i < len(instances); i++ {
instanceList[i] = instances[i].(*api.AgentService)
}
// 使用负载均衡算法选取实例
selectInstance, err := s.loadbalance.SelectService(instanceList)
if err != nil {
config.Logger.Println(err.Error())
return err
}
config.Logger.Printf("current string-service ID is %s and address:port is %s:%s\n", selectInstance.ID, selectInstance.Address, strconv.Itoa(selectInstance.Port))
requestUrl := url.URL{
Scheme: "http",
Host: selectInstance.Address + ":" + strconv.Itoa(selectInstance.Port),
Path: "/op/" + operationType + "/" + a + "/" + b,
}

resp, err := http.Post(requestUrl.String(), "", nil)
if err != nil {
return err
}
result := &StringResponse{}

err = json.NewDecoder(resp.Body).Decode(result)
if err != nil {
return err
} else if result.Error != nil {
return result.Error
}

operationResult = result.Result
return nil

}, func(e error) error {
return ErrHystrixFallbackExecute
})
return operationResult, err
}

// HealthCheck implement Service method
// 用于检查服务的健康状态,这里仅仅返回true。
func (s UseStringService) HealthCheck() bool {
return true
}

上述代码中,在UseStringService方法中封装了对string-service服务的HTTP调用。同时为了提供服务熔断能力,在对string-service进行HTTP调用时,我们使用了Hystrix对调用过程进行包装。对此,需要引入hystrix-go的相关依赖https://github.com/afex/hystrix-go。可以看到我们将服务发现和HTTP调用过程通过hystrix.Do函数包装为一个Hystrix命令来执行,hystrix.Do是一种同步命令调用方式,我们的调用结果将会同步返回。 除此之外,Hystrix还提供异步调用方式。

对于每一种Hystrix命令,我们都需要为它们赋予不同的名称,标明它们是属于不同的远程调用,命令相同的Hystrix命令将会使用相同的断路器进行熔断保护,在上述代码中,我们将该Hystrix命令命名为String.string,该名称下的Hystrix命令都会使用相同的断路器进行熔断保护和数据统计。

hystrix.Do包装方法中,我们首先通过“string”服务名从Consul中获取其服务实例列表;接着我们使用负载均衡器从服务实例列表选取一个合适的服务实例进行调用;最后服务调用结束后返回调用结果,如果调用过程中发生异常,则返回异常。

hystrix.Do还可以在最后对异常进行处理,对此需要定义一个失败回滚函数,可以使用它在服务调用失败时进行异常处理和回滚操作,如果不定义就直接返回异常。在上面的例子中,我们定义了一个简单的失败回滚函数,返回了特定的异常信息。注意,如果该名称的Hystrix断路器已经打开,那么hystrix.Do直接执行失败回滚函数,跳过远程调用过程,进行服务熔断操作

2,use-string-service的endpoint层

在endpoint层中,我们需要创建UseStringEndpoint将UseStringService方法提供出去,源码位于ch10-resiliency/use-string-service/endpoint/endpoints.go,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package endpoint

import (
"context"
"github.com/go-kit/kit/endpoint"
"micro-go-book/ch10-resiliency/use-string-service/service"
)

// CalculateEndpoint define endpoint
type UseStringEndpoints struct {
UseStringEndpoint endpoint.Endpoint
HealthCheckEndpoint endpoint.Endpoint
}


// StringRequest define request struct
type UseStringRequest struct {
RequestType string `json:"request_type"`
A string `json:"a"`
B string `json:"b"`
}

// StringResponse define response struct
type UseStringResponse struct {
Result string `json:"result"`
Error string `json:"error"`
}

// MakeStringEndpoint make endpoint
func MakeUseStringEndpoint(svc service.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(UseStringRequest)

var (
res, a, b, opErrorString string
opError error
)

a = req.A
b = req.B

res, opError = svc.UseStringService(req.RequestType, a, b)

if opError != nil{
opErrorString = opError.Error()
}

return UseStringResponse{Result: res, Error: opErrorString}, nil
}
}

// HealthRequest 健康检查请求结构
type HealthRequest struct{}

// HealthResponse 健康检查响应结构
type HealthResponse struct {
Status bool `json:"status"`
}

// MakeHealthCheckEndpoint 创建健康检查Endpoint
func MakeHealthCheckEndpoint(svc service.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return HealthResponse{status}, nil
}
}

在上述代码中,我们使用MakeUseStringEndpoint方法构建了UseStringEndpoint,将UseStringService.UseStringService方法暴露了出去,以供transport层调用。

3,use-string-service的transport层

在transport层中,我们需要将UseStringEndpoint部署在use-string-service服务的/op/{type}/{a}/{b}路径下,这样子我们在调用use-string-service服务的/op/{type}/{a}/{b}接口时会把请求转发给string-service服务进行处理,以验证负载均衡和服务熔断的效果。源码位于ch10-resiliency/use-string-service/transport/http.go下,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package transport

import (
"context"
"encoding/json"
"errors"
"github.com/afex/hystrix-go/hystrix"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
"micro-go-book/ch10-resiliency/use-string-service/endpoint"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)

var (
ErrorBadRequest = errors.New("invalid request parameter")
)

// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints endpoint.UseStringEndpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()


options := []kithttp.ServerOption{
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(encodeError),
}

r.Methods("POST").Path("/op/{type}/{a}/{b}").Handler(kithttp.NewServer(
endpoints.UseStringEndpoint,
decodeStringRequest,
encodeStringResponse,
options...,
))

r.Path("/metrics").Handler(promhttp.Handler())

// create health check handler
r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
endpoints.HealthCheckEndpoint,
decodeHealthCheckRequest,
encodeStringResponse,
options...,
))

// 添加 hytrix 监控数据
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
r.Handle("/hystrix/stream", hystrixStreamHandler)

return r
}

// decodeStringRequest decode request params to struct
func decodeStringRequest(_ context.Context, r *http.Request) (interface{}, error) {
vars := mux.Vars(r)
requestType, ok := vars["type"]
if !ok {
return nil, ErrorBadRequest
}

pa, ok := vars["a"]
if !ok {
return nil, ErrorBadRequest
}

pb, ok := vars["b"]
if !ok {
return nil, ErrorBadRequest
}

return endpoint.UseStringRequest{
RequestType: requestType,
A: pa,
B: pb,
}, nil
}

// encodeStringResponse encode response to return
func encodeStringResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", "application/json;charset=utf-8")
return json.NewEncoder(w).Encode(response)
}

// decodeHealthCheckRequest decode request
func decodeHealthCheckRequest(ctx context.Context, r *http.Request) (interface{}, error) {
return endpoint.HealthRequest{}, nil
}

func encodeError(_ context.Context, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
switch err {
default:
w.WriteHeader(http.StatusInternalServerError)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"error": err.Error(),
})
}

在上述代码中,我们将UseStringEndpoint部署在use-string-service服务的/op/{type}/{a}/{b}路径下。接着我们使用decodeStringRequest方法将HTTP请求参数转化为endpoint.UseStringRequest传递给UseStringEndpoint,并使用encodeStringResponse将UseStringEndpoint返回的结果转化为JSON数据返回给调用客户端。

use-string-service的main函数将完成服务注册并依次构建service层、endpoint层、transport层,然后将transport的HTTP服务部署在配置的端口下,具体实现参考文件ch10-resiliency/use-string-service/main.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"flag"
"fmt"
"github.com/go-kit/kit/circuitbreaker"
uuid "github.com/satori/go.uuid"
"micro-go-book/ch10-resiliency/use-string-service/config"
"micro-go-book/ch10-resiliency/use-string-service/endpoint"
"micro-go-book/ch10-resiliency/use-string-service/service"
"micro-go-book/ch10-resiliency/use-string-service/transport"
"micro-go-book/common/discover"
"micro-go-book/common/loadbalance"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
)

func main() {

var (
servicePort = flag.Int("service.port", 10086, "service port")
serviceHost = flag.String("service.host", "127.0.0.1", "service host")
consulPort = flag.Int("consul.port", 8500, "consul port")
consulHost = flag.String("consul.host", "127.0.0.1", "consul host")
serviceName = flag.String("service.name", "use-string", "service name")
)

flag.Parse()

ctx := context.Background()
errChan := make(chan error)
var discoveryClient discover.DiscoveryClient
discoveryClient, err := discover.NewKitDiscoverClient(*consulHost, *consulPort)

if err != nil {
config.Logger.Println("Get Consul Client failed")
os.Exit(-1)

}
var svc service.Service
svc = service.NewUseStringService(discoveryClient, &loadbalance.RandomLoadBalance{})
useStringEndpoint := endpoint.MakeUseStringEndpoint(svc)
useStringEndpoint = circuitbreaker.Hystrix(service.StringServiceCommandName)(useStringEndpoint)

//创建健康检查的Endpoint
healthEndpoint := endpoint.MakeHealthCheckEndpoint(svc)

//把算术运算Endpoint和健康检查Endpoint封装至StringEndpoints
endpts := endpoint.UseStringEndpoints{
UseStringEndpoint: useStringEndpoint,
HealthCheckEndpoint: healthEndpoint,
}

//创建http.Handler
r := transport.MakeHttpHandler(ctx, endpts, config.KitLogger)

instanceId := *serviceName + "-" + uuid.NewV4().String()

//http server
go func() {

config.Logger.Println("Http Server start at port:" + strconv.Itoa(*servicePort))
//启动前执行注册
if !discoveryClient.Register(*serviceName, instanceId, "/health", *serviceHost, *servicePort, nil, config.Logger) {
config.Logger.Printf("use-string-service for service %s failed.", serviceName)
// 注册失败,服务启动失败
os.Exit(-1)
}
handler := r
errChan <- http.ListenAndServe(":"+strconv.Itoa(*servicePort), handler)
}()

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()

error := <-errChan
//服务退出取消注册
discoveryClient.DeRegister(instanceId, config.Logger)
config.Logger.Println(error)
}

接下来我们来验证两个组件的执行效果。

4,负载均衡和服务熔断效果验证

项目编写完成之后,我们需要检验它们的效果。我们首先启动Consul,启动命令如下:

1
consul agent -dev

接着在ch10-resiliency/string-service和ch10-resiliency/use-string-service目录分别启动string-service和use-string-service服务,启动命令如下:
1
go run main.go

为了保证string-service存在多实例可调用,我们需要在启动string-service后再启动一个监听其他端口的新服务实例。我们将新的服务实例部署在10089端口下,在string-service目录下新开一个命令行,使用以下命令启动:
1
go run main.go -service.port 10089

然后访问Consul的主页http://localhost:8500/,如图10-6所示。

图10-6 Consul主页

我们能够发现string-service和use-string-service服务都已经注册上去了,接着我们访问use-string-service服务的/{op}/{type}/{a}/{b}接口,通过use-string-service服务发起对string-service服务的远程调用。curl请求命令如下:

1
curl -X POST http://localhost:10086/op/concat/11/12

即可以得到正确的响应如下:
1
2
3
4
{
"result": 1112,
"error": ""
}

并且在use-string-service服务的命令行下查看到以下日志:
1
2
2024/06/13 16:56:46 current string-service ID is string-13f0bba1-5815-4d6b-8cad-892ad125471e 
and address:port is 127.0.0.1:10085

从日志中我们可以发现本次请求转发到端口10085的string-service服务实例中进行处理。多次发起curl请求命令,我们会从日志中发现请求被随机分发到端口10085和10089的string-service服务实例中处理,说明我们的随机负载均衡器发挥了随机分发请求的作用。多次请求后的日志可能结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
2024/06/13 16:59:10 current string-service ID is string-13f0bba1-5815-4d6b-8cad-892ad125471e 
and address:port is 127.0.0.1:10089
2024/06/13 16:59:11 current string-service ID is string-afc426e0-7993-43d3-90eb-c85ed506e4fd
and address:port is 127.0.0.1:10085
2024/06/13 16:59:12 current string-service ID is string-13f0bba1-5815-4d6b-8cad-892ad125471e
and address:port is 127.0.0.1:10089
2024/06/13 16:59:13 current string-service ID is string-afc426e0-7993-43d3-90eb-c85ed506e4fd
and address:port is 127.0.0.1:10085
2024/06/13 16:59:14 current string-service ID is string-afc426e0-7993-43d3-90eb-c85ed506e4fd
and address:port is 127.0.0.1:10085
2024/06/13 16:59:15 current string-service ID is string-afc426e0-7993-43d3-90eb-c85ed506e4fd
and address:port is 127.0.0.1:10085

接着我们关闭所有string-service服务,继续使用curl请求结果,将会得到以下响应:
1
2
3
4
{
"result": "",
"error": "fallback failed with 'Hystrixfall back execute'. run error was 'service instances are not existed'"
}

这个返回结果中包含了异常信息,同时日志中也会输出service instances are not existed,这说明了hystrix.Do中被包装的代码已经执行了。由于在创建UseStringService时我们设定了以String.string命名的断路器生效触发请求阀值为5次,连续使用curl接口请求失败5次之后继续访问将发现不再有先前的日志输出,返回的响应也变为以下异常:
1
2
3
4
{
"result": "",
"error": "fallback failed with 'Hystrixfall back execute'. run error was 'hystrix: circuit open'"
}

这说明此时断路器已经打开,直接执行了失败回滚函数返回异常结果。如果5秒之后我们重新使用curl访问接口,将会发现请求重新执行了hystrix.Do中的远程调用代码,这是因为断路器打开之后的超时时间已经结束(默认为5秒钟),断路器进入了半开状态,允许程序重新执行远程调用,试探下游服务是否恢复可用状态,因为此时string-service服务处于一直不可用的状态,所以请求失败后,断路器又回到了打开状态。

使用Go-kit Hystrix中间件

Go-kit作为微服务工具集,围绕Endpoint提供了包括断路器、限流器、日志等多种中间件,它们都是以装饰者模式对原有的Endpoint进行行为包装,增加特定的组件行为。

Go-kit中提供了服务熔断Hystrix的中间件,对此我们可以在endpoint层直接使用,无需在service中自行封装。接下来我们将在use-string-service的endpoint层中直接使用Hystrix中间件修饰UseStringEndpoint。

【实例10-1】使用Go-kit Hystrix中间件修饰Endpoint

首先将service层中UseStringService方法的Hystrix相关代码移除,修改代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (s UseStringService) UseStringService (operationType, a, b string) (string, error) {

var operationResult string
var err error

instances := s.discoveryClient.DiscoverServices(StringService, config.Logger)
instanceList := make([]*api.AgentService, len(instances))
for i := 0; i < len(instances); i++ {
instanceList[i] = instances[i].(*api.AgentService)
}
// 使用负载均衡算法选取实例
selectInstance, err := s.loadbalance.SelectService(instanceList);
if err == nil {
config.Logger.Printf("current string-service ID is %s and address:port is %s:%s\n", selectInstance.ID, selectInstance.Address, strconv.Itoa(selectInstance.Port))
requestUrl := url.URL{
Scheme: "http",
Host: selectInstance.Address + ":" + strconv.Itoa(selectInstance.Port),
Path: "/op/" + operationType + "/" + a + "/" + b,
}

resp, err := http.Post(requestUrl.String(), "", nil)
if err == nil {
result := &StringResponse{}
err = json.NewDecoder(resp.Body).Decode(result)
if err == nil && result.Error == nil{
operationResult = result.Result
}

}
}
return operationResult, err
}

构建在endpoint层的Hystrix需要以Endpoint中返回的error来统计调用失败次数,因此需要修改endpoint层的MakeUseStringEndpoint创建函数,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func MakeUseStringEndpoint(svc service.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(UseStringRequest)

var (
res, a, b string
opError error
)

a = req.A
b = req.B

res, opError = svc.UseStringService(req.RequestType, a, b)

return UseStringResponse{Result: res}, opError
}
}

在上述代码中,我们不再将业务逻辑的错误封装到response中返回,而是直接通过Endpoint的err返回给transport层。最后我们修改main函数,在构建UseStringEndpoint时添加Go-kit Hystrix中间件,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
import (
...
// 引入中间件依赖
"github.com/go-kit/kit/circuitbreaker"
...
)
...
var svc service.Service
svc = service.NewUseStringService(discoveryClient, &loadbalance.RandomLoadBalance{} )
useStringEndpoint := endpoint.MakeUseStringEndpoint(svc)
useStringEndpoint = circuitbreaker.Hystrix(service.StringServiceCommandName)(useStringEndpoint)

如此即可以实现与我们自定义Hystrix命令相同的功能。查看circuitbreaker.Hystrix的实现逻辑,可以发现Go-kit也是使用hystrix.Do方法对Endpoint的调用方法进行包装。但是需要注意的是,如果使用Go-kit提供的Hystrix中间件,将无法定义相关的失败回滚函数,不利于远程调用失败后的恢复处理工作。

Hystrix详解

在上一小节中,我们通过一个实例演示了在如何在服务调用中使用Hystrix,通过Hystrix服务熔断能力为服务之间的安全调用保驾护航,在这一小节中我们将对Hystrix的详细使用和基本原理进行介绍。

Hystrix基本使用

hystrix-go中总共提供了两种方式包装远程调用的方式,一种是在10.3小节中使用的Hystrix同步执行包装方式,另外一种是异步执行包装模式。无论是哪种方式,都需要为被包装的执行函数赋予对应的Hystrix命名,命名相同的Hystrix命令的执行过程会使用相同的断路器进行统计和控制。

1,同步执行

同步Hystrix的使用方式如下:

1
2
3
4
5
6
7
err := hystrix.Do("test command", func() error {
// 远程调用&或者其他需要保护的方法
return nil
}, func (err error) error {
// 失败回滚方法
return nil
})

除了定义Hystrix命令的命名和具体的被包装函数外,我们还可选择定义失败回滚方法,这个方法在被包装的远程调用函数返回异常或者断路器被打开时执行,我们可以在失败回滚方法中定义一些本地处理流程、重试或者回滚操作,以保证调用流程的正常进行。

2,异步执行

异步Hystrix的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
resultChan := make(chan interface{}, 1)

errChan := hystrix.Go("test command", func() error {
// 远程调用&或者其他需要保护的方法
resultChan <- "success"
return nil
}, func (e error) error {
// 失败回滚方法
return nil
})

select {
case err := <- errChan:
// 执行失败
case result := <-resultChan
// 执行成功
case <-time.After(2 * time.Second): // 超时设置
fmt.Println("Time out")
return
}

如上代码所述,通过hystrix.Go异步执行的远程调用将会在与当前goroutine不同的goroutine中执行,执行的异常是通过channel的方式返回给调用goroutine。对此,如果我们想要获取到远程调用的返回结果,需要定义一个返回处理结果的resultChan,在远程调用结束后将结果放入resultChan,调用goroutine就可以通过resultChan获取到调用结果。

其实无论是同步调用还是异步调用,都是在一个新的goroutine中异步执行调用逻辑,只不过hystrix.Do使用channel为我们将异步过程处理为同步调用。

运行流程

除了进行服务熔断,Hystix在执行过程中还为不同命名的远程调用提供goroutine隔离的能力。goroutine 隔离使得不同的远程调用方法在固定数量的goroutine 下执行,控制了每种远程调用的并发数量,从而进行流量控制;在某个Hystrix命令调用出现大量超时阻塞时也仅仅会影响到与自己命名相同的Hystrix命令,并不会影响其他Hystix命令以及系统其他请求的执行。在hystrix命令配置的goroutine执行数量被占满时,该Hystrix命令的执行将会直接进入到失败回滚逻辑中,进行服务降级,保护服务调用者的资源稳定。

hystrix-go的整体调用流程如图10-7所示。

图10-7 Hystrix-go整体调用流程

我们来详细分析一下这个调用流程:

(1)每一个被Hystrix包装的远程调用逻辑都会封装为一个Hystrix命令,其内包含用户预置远程调用逻辑和失败回滚逻辑,根据Hystrix命名唯一确认一个Hystrix命令。

(2)根据Hystrix命令的命名获取到对应的断路器,判断断路器是否打开。如果断路器打开,将直接执行失败回滚逻辑,不执行真正的远程调用逻辑,此时服务调用已经被熔断了。如果断路器关闭或者处于半开状态,将向执行池请求执行通行证。

(3)Hystrix中每一种命令都限制了并发数量,当Hystrix命令的并发数量超过了执行池中设定的最大执行数量时,额外的请求就会被直接拒绝,进入到失败回滚逻辑中,避免服务过载。如果执行池中的最大执行数量未满,那么请求才会进入到执行远程调用的逻辑中。

(4)在执行远程调用时,执行出现异常或者下游服务执行超时,那么Hystrix命令将会向Metrics控制器上传执行结果,并进入到失败回滚逻辑中。

(5)Metrics控制器使用滑动窗口的方式统计一段时间的调用次数、失败次数、超时次数和被拒绝次数(执行池已满时请求被拒绝),如果该段时间内的错误频率(执行不成功的总次数占请求总次数)超过了断路器错误率阀值,那么断路器将会打开。在重试超时定时器到达之前的请求都会直接进入失败回滚逻辑,拒绝执行真正的远程调用。

常用参数配置

对于每一种命名的Hystrix命令,我们可以在命令执行之前对命令进行自定义配置,能够进行配置的参数主要有:

1
2
3
4
5
6
7
8
// CommandConfig is used to tune circuit settings at runtime
type CommandConfig struct {
Timeout int `json:"timeout"`
MaxConcurrentRequests int `json:"max_concurrent_requests"`
RequestVolumeThreshold int `json:"request_volume_threshold"`
SleepWindow int `json:"sleep_window"`
ErrorPercentThreshold int `json:"error_percent_threshold"`
}

Hystrix命令配置信息封装在CommandConfig结构体中,这些配置信息的具体说明如下表10-1所示。

表10-1

在10.3小节的例子中,为了减少断路器生效的最小请求阀值,我们在NewUseStringService方法中将Hystrix的RequestVolumeThreshold设置为5。自定义Hystrix命令的配置如下所示:

1
2
3
hystrix.ConfigureCommand("test_command", hystrix.CommandConfig{
//设置参数
})

具体工作实践中,可以根据我们的需要,对表10-1中的5个配置参数进行修改,使Hystrix的保护功能更好地与当前系统相结合。在hystrix.settings.go文件中有Hystrix命令的默认参数设置,如果不需要调整Hystrix执行配置,可以直接使用默认设置执行。

Hystrix监控面板

Hystrix中提供以HTTP的方式获取当前服务的Hystrix命令调用状态信息的能力,结合对应的Hystrix可视化面板,可以让开发人员对下游依赖服务运行状态有清晰地认知,有利于定位和排查微服务间的异常调用问题。

获取Hystrix命令调用信息

对于每一种Hystrix命令,我们都可以在运行时获取到其对应的断路器对象CircuitBreaker,通过CircuitBreaker.IsOpen可以获取当前命令的断路器是否打开,调用能否正常进行,看下面这个小例子:

1
2
3
4
circuit,L:= hystrixiGetcircuit ("test command")
fmt:Printlni("command test command's .cifcuit
open
strcony.FormatBool(circuit:IsOpen()))

在上述实例代码中,我们使用hystrix.GetCircuit获取到了test_command命令的断路器对象,并通过它判断断路器是否打开。除此之外,我们还可以使用hystrixStreamHandler看到当前服务实例下所有Hystrix命令的调用状态。hystrixStreamHandler会把Metrics控制器收集的所有状态信息按每秒1次的频率向所有连接的HTTP客户端推送,以供开发人员对系统状态进行及时把控和调整。

接下来我们将为use-string-service服务开启hystrixStreamHandler,修改transport.MakeHttpHandler方法如下所示:

1


可以看到,在MakeHttpHandler方法的末尾,我们为HTTP添加了一个Hystrix信息的推送接口。

在ch10-resiliency/use-string-service目录下启动use-string-service服务,启动命令如下:

1
go run main.go

我们先调用一次curl,以保证Metrics控制器中已经收集到数据,命令如下:
1
curl -X POST http://1ocalhost:10086/op/Concat/11/12

接着访问http://localhost:10086/hystrix/stream接口,可以看到Metrics控制器中的Hystrix命令调用信息被持续通过流推送到浏览器中,如下所示:
1


返回的信息主要包含近段时间内各种Hystrix命令的调用状态、结果以及它们对应的执行池和断路器的状态,通过它们就可以了解当前服务依赖的下游服务的状态,对异常的服务及时进行恢复处理。

使用HystrixDashboard可视化面板

hystrixStreamHandler控制器中返回的数据过于杂乱,无法快速发现有用数据和定位问题,对此,我们可以结合Hystrix Dashboard对上述信息进行直观的查看。由于hystrix-go没有提供对应的可视化界面,我们采用开源的mlabouardy/hystrix-dashboard进行可视化查看。接下来我们演示docker版本的mlabouardy/hystrix-dashboard的使用,使用以下命令启动该可视化工具的docker镜像:

1
$ docker run --name hystrix-dashboard -d -p 10087:8080 mlabouardy/hystrix-dashboard:latest

接着我们访问Hystrix Dashoard的主界面,地址为http://localhost:10087/hystrix,如图10-8所示。

图10-8 Hystrix Dashboard主界面

在地址栏输入hystrixStreamHandler数据流的地址http://10.93.244.130:10086/hystrix/stream,因为Hystrix Dashboard运行在docker容器上,并且我们的容器是以桥接的网络模式启动,所以要把hystrixStreamHandler的host替换为本机地址,比如笔者的机器局域网地址为10.93.244.130。填入对应的信息后如下图10-9所示。

图10-9 输入hystrixStreamHandler地址

单击Monitor Stream进入到use-string-service服务的Hystrix Dashboard中,如图10-10所示。

图10-10 Hystrix Dashboard监控页面

从图10-10中可以看到,String.stringHystrix命令的执行失败率是100%,断路器已经打开,防止请求进入到真正的远程调用逻辑中。

实践案例:在网关中添加Hystrix熔断和负载均衡

在这个实践案例中,我们主要来改造第9章中手动实现的API网关,为API反向代理的微服务调用添加Hystrix的熔断保护和资源隔离功能以及负载均衡能力,以保护API网关的稳定运行。

在第9章手动实现的API网关中,我们主要通过ReverseProxy实现了反向代理的功能,所以我们本实例的工作量主要集中在使用Hystrix包装整个反向代理逻辑和添加负载均衡器方面。

首先定义HystrixHandler用于实现http.Handler接口,表明它可用于处理HTTP请求,结构体定义和构造函数如下代码所示:

1


在 HystrixHandler结构体中我们定义了hystrixs用于记录当前注册的Hystrix命令,discoveryCiient变量用于服务发现,loadbalance变量用于负载均衡。

接下来我们实现ServeHTTP接口,其主要逻辑是将反向代理的逻辑使用hystrix.Do包装起来,代码如下所示:

1


在上述代码中,主要进行了以下工作:

(1)根据请求路径中提供的服务名从hystrixHandler中查找该服务名的Hystrix命名是否已经配置过了。如果没有,对hystrix命令进行初始化配置。

(2)在hystrix.Do的包装中执行3、4、5步骤。

(3)根据请求路径中提供的服务名从discoveryClient中获取服务实例列表。

(4)使用负载均衡器从服务实例列表中选取一个服务实例地址用于构造ReverseProxy;并定义errorHandler,用于从ReverseProxy获取执行失败后抛出的异常。

(5)执行ReverseProxy.ServeHTTP进行代理转发,如果代理转发过程中出错将会反馈给 Hystrix。

(6)如果hystrix.Do中执行的代理转发逻辑出错,向客户端返回服务端500的错误。

最后我们将这个使用Hystrix包装过的反向代理注册到Web服务器中,修改gateway.go代码如下:

1


这里,我们仅需要将原来生成反向代理的方法修改为我们定义的HystrixHandler即可。通过这样的方式,HystrixHandler会在下游服务器不可用时,切断网关对该服务器请求的转发,保护网关的线程资源,避免服务雪崩的发生。同时负载均衡器也能将请求分发服务集群的各个服务中,提高服务集群的处理能力。

小结

本章主要介绍了微服务架构中的服务熔断组件和负载均衡组件。我们首先了解了服务熔断的必要性,理解了下游服务的崩溃可能会引发服务雪崩而导致整个分布式系统的崩溃,同时也明白了在微服务体系中负载均衡组件发挥的强大横向扩展能力;接着介绍了断路器在服务调用中对服务调用者提供的强大保护能力,并对负载均衡的类别和几类负载均衡算法进行了了解;最后我们使用主流的hystrix-go 服务熔断组件结合自定义的负载均衡器,在微服务调用和网关转发中进行实践,实现了断路器对上游服务提供的保护作用以及负载均衡器请求分发的作用。

在微服务架构中,原本的单体应用按照业务被划分为数量众多的微服务。由于业务依赖的关系,服务之间经常会发生远程调用。我们从服务注册与发现中心中获取的服务实例列表,需要借助负载均衡组件选取合适的服务实例才能发起远程调用。负载均衡组件能够有效地将请求均衡地分发到服务集群的各个服务实例中,提高服务集群的负载和吞吐量。服务熔断组件能够在下游服务出现异常时,及时切断服务调用者对服务提供者的请求,达到保护上游服务稳定性的目的。负载均衡组件和服务熔断组件是微服务架构的不可或缺的基础组件,它们为微服务之间的远程调用提供了有效的支持和保障。

0%