Spring WebClient实现微服务的负载均衡调用
在微服务应用中,负载均衡来保证应用的可用性的常用手段。Spring-Cloud-LoadBalancer
提供了服务之间实现负载均衡调用的能力。
除了使用
Open-Feign外,还可以使用RestTemplate
和WebClint
进行服务调用。
WebClient
是
Spring WebFlux项目下的HTTP工具,基于Reactor的非阻塞流式API,是RestTemplate
的有效替代。
Spring Cloud 负载均衡 #
Spring Cloud提供了客户端负载均衡的抽象和实现。如果你的微服务引入了spring-cloud-starter-loadbalancer
包,并且没有在配置文件中使用spring.cloud.loadbalancer.enabled = false
禁用负载均衡,那么Spring Cloud负载均衡的必需项会在应用启动后自动配置。
默认情况下,Spring Cloud LoadBalancer会实现ReactiveLoadBalancer
接口,这个接口基于轮询(Round-Robin)来随机访问微服务实例。而微服务示例由谁来提供呢?
很明显,自然是由 服务发现来提供。
那么,服务发现与负载均衡之间沟通的桥梁是什么?这就不得不提ServiceInstanceListSupplier
这个接口了,这个接口使服务发现客户端通过service-id
获取服务实例。
ServiceInstanceListSupplier
接口提供了builder()
方法来创建实例。
1public interface ServiceInstanceListSupplier
2 extends Supplier<Flux<List<ServiceInstance>>> {
3
4 String getServiceId();
5 default Flux<List<ServiceInstance>> get(Request request) {
6 return get();
7 }
8 static ServiceInstanceListSupplierBuilder builder() {
9 return new ServiceInstanceListSupplierBuilder();
10 }
11}
默认情况下,如果微服务引入了spring-boot-starter-webflux
(当然啦,都使用WebClient
了),Spring Cloud会自动配置ReactiveCompositeDiscoveryClient
这个服务发现客户端:
1public class ReactiveCompositeDiscoveryClientAutoConfiguration {
2 @Bean
3 @Primary
4 public ReactiveCompositeDiscoveryClient reactiveCompositeDiscoveryClient(
5 List<ReactiveDiscoveryClient> discoveryClients) {
6 return new ReactiveCompositeDiscoveryClient(discoveryClients);
7 }
8}
如果使用Eureka作为服务发现实现,那么上述配置所使用的形参discoveryClients
则通过
1public class EurekaReactiveDiscoveryClientConfiguration {
2
3 @Bean
4 @ConditionalOnMissingBean
5 public EurekaReactiveDiscoveryClient eurekaReactiveDiscoveryClient
6 (EurekaClient client, EurekaClientConfig clientConfig)
7 {
8 return new EurekaReactiveDiscoveryClient(client, clientConfig);
9 }
10 // 以下省略...
11}
同样地,配置依赖的EurekaClient
和EurekaClientConfig
也是自动配置,这里不在展开讨论。
有了服务发现,还需要配置负载均衡,Spring默认情况下也进行了自动配置。
首先是通过 LoadBalancerAutoConfiguration
配置负载均衡客户端工厂LoadBalancerClientFactory
:
1// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
2@ConditionalOnMissingBean
3@Bean
4public LoadBalancerClientFactory loadBalancerClientFactory
5 (LoadBalancerClientsProperties properties)
6{
7 LoadBalancerClientFactory clientFactory
8 = new LoadBalancerClientFactory(properties);
9 clientFactory.setConfigurations(
10 this.configurations.getIfAvailable(Collections::emptyList));
11 return clientFactory;
12}
而负载均衡的客户端则是在LoadBalancerClientConfiguration
类中进行配置:
1// org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
2@Bean
3@ConditionalOnMissingBean
4public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
5 LoadBalancerClientFactory loadBalancerClientFactory) {
6 String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
7 return new RoundRobinLoadBalancer(
8 loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
9}
10
11@Bean
12@ConditionalOnBean(ReactiveDiscoveryClient.class)
13@ConditionalOnMissingBean
14@Conditional(DefaultConfigurationCondition.class)
15public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
16 ConfigurableApplicationContext context) {
17 return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
18}
19
20// 以下省略...
默认情况下,Spring-Cloud使用RoundRobinLoadBalancer
进行服务轮询。同时,还配置了ServiceInstanceListSupplier
,使用了基础的发现客户端和缓存。
1// org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplierBuilder
2public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
3 if (baseCreator != null && LOG.isWarnEnabled()) {
4 LOG.warn("Overriding a previously set baseCreator with a ReactiveDiscoveryClient baseCreator.");
5 }
6 this.baseCreator = context -> {
7 ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
8
9 return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
10 };
11 return this;
12}
13
14public ServiceInstanceListSupplierBuilder withCaching() {
15 if (cachingCreator != null && LOG.isWarnEnabled()) {
16 LOG.warn(
17 "Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
18 }
19 this.cachingCreator = (context, delegate) -> {
20 ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
21 .getBeanProvider(LoadBalancerCacheManager.class);
22 if (cacheManagerProvider.getIfAvailable() != null) {
23 return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
24 }
25 if (LOG.isWarnEnabled()) {
26 LOG.warn("LoadBalancerCacheManager not available, returning delegate without caching.");
27 }
28 return delegate;
29 };
30 return this;
31}
WebClient作为负载均衡客户端 #
使用WebClient
作为负载均衡客户端非常简单,只需要使用@LoadBalanced
注解即可:
1@Configuration
2public class MyConfiguration {
3
4 @Bean
5 @LoadBalanced
6 public WebClient.Builder loadBalancedWebClientBuilder() {
7 return WebClient.builder();
8 }
9}
10
11public class MyClass {
12 @Autowired
13 private WebClient.Builder webClientBuilder;
14
15 public Mono<String> doOtherStuff() {
16 return webClientBuilder.build().get().uri("http://stores/stores")
17 .retrieve().bodyToMono(String.class);
18 }
19}
需要注意的是,在使用负载均衡后,WebClient
发起调用的uri
必需使用注册中心的服务名,而不能使用host:port
的组合了。
上文已经提到,默认情况下,引入spring-cloud-starter-loadbalancer
包后,Spring Cloud Load Balancer已经开始工作,ReactiveLoadBalancer
已经自动配置并且在工作了。
通常,在微服务调用时,还需要处理请求之间头的传递问题。这需要在请求中加入一个过滤器,用来处理这个问题。
在WebClient
请求中加入过滤器添加请求头信息也很容易,只需要实现ExchangeFilterFunction
接口:
1public class UserContextWebClientFilter implements ExchangeFilterFunction {
2 @Override
3 public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
4 ClientRequest buildRequest = ClientRequest.from(request).headers(h -> {
5 h.add(UserContext.CORRELATION_ID, UserContextHolder.getContext().getCorrelationId());
6 h.add(UserContext.AUTH_TOKEN, UserContextHolder.getContext().getAuthToken());
7 }).build();
8 return next.exchange(buildRequest);
9 }
10}
然后在配置WebClient
时,添加过滤器即可:
1@Configuration
2public class WebClientLoadBalancerConfiguration {
3
4 @Bean
5 @LoadBalanced
6 public WebClient.Builder loadBalancedWebClient() {
7 return WebClient.builder()
8 .filters(
9 f -> {
10 f.add(new UserContextWebClientFilter());
11 });
12 }
13}
自定义负载均衡配置 #
上文提到,默认情况下,Spring-Cloud会自动配置RoundRobinLoadBalancer
和ServiceInstanceListSupplier
,如果想自定义负载均衡配置,可以手动配置这两个类。
使用RamdomLoadBalancer
而非RoundRobinLoadBalancer
#
1public class CustomLoadBalancerConfiguration {
2
3 @Bean
4 ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
5 LoadBalancerClientFactory loadBalancerClientFactory) {
6 String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
7 return new RandomLoadBalancer(loadBalancerClientFactory
8 .getLazyProvider(name, ServiceInstanceListSupplier.class),
9 name);
10 }
11}
LoadBalancerClientFactory
前文提过,是通过LoadBalancerAutoConfiguration
自动配置的。
自定义ServiceInstanceListSupplier #
除了自定义RamdomLoadBalancer
外,还可以手动配置ServiceInstanceListSupplier
:
1public class CustomLoadBalancerConfiguration {
2
3 @Bean
4 public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
5 ConfigurableApplicationContext context) {
6 return ServiceInstanceListSupplier.builder()
7 .withDiscoveryClient()
8 .withHealthChecks() // 使用健康检查
9 .build(context);
10 }
11 }
使用自定义配置 #
自定义配置类不能使用
@Configuration
注解。
要使用手动配置注解,可以使用@LoadBalancerClient
注解。
1@Configuration
2@LoadBalancerClient(value = "licensing-service", configuration = CustomLoadBalancerConfig.class)
3public class WebClientLoadBalancerConfiguration {
4
5 @Bean
6 @LoadBalanced
7 public WebClient.Builder loadBalancedWebClient() {
8 return WebClient.builder()
9 .filters(
10 filters -> {
11 filters.add(new UserContextWebClientFilter());
12 });
13 }
14}
No servers available for service *** #
如上,在使用自定义配置的情况下,若在自定义配置中使用健康检查的情况下,可能会遇到负载均衡无法找到服务的问题:
1ServiceInstanceListSupplier.builder()
2 .withDiscoveryClient()
3 .withHealthChecks() // 使用健康检查
4 .build(context);
1ERROR 88657 --- [nio-8888-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN ] with root cause
2WARN 88657 --- [ parallel-5] o.s.c.l.core.RoundRobinLoadBalancer : No servers available for service: 192.168.1.70
3WARN 88657 --- [ parallel-5] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70
4WARN 88657 --- [ parallel-9] o.s.c.l.core.RoundRobinLoadBalancer : No servers available for service: 192.168.1.70
5WARN 88657 --- [ parallel-9] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70
6WARN 88657 --- [ parallel-12] o.s.c.l.core.RoundRobinLoadBalancer : No servers available for service: 192.168.1.70
7WARN 88657 --- [ parallel-12] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70
原因在于,使用同一个WebClient.Builder
实例同时用来处理请求和发送健康健康检查,因此,HealthCheckServiceInstanceListSupplier
用来健康检查的的请求是由负载均衡客户端发出的。实际上,健康请求应该是非负载均衡的(因健康检查使用的是host,而不是服务id,从上面的警告日志可看出)。因此,可以初拥独立的非负载均衡的客户端用来健康检查。像这样:
1ServiceInstanceListSupplier.builder()
2 .withDiscoveryClient()
3 .withHealthChecks(WebClient.builder().build()) // 使用非负载均衡的健康检查
4 .build(context);
另外,还可以通过配置自定义健康检查机制:
1spring:
2 cloud:
3 loadbalancer:
4 health-check:
5 initial-delay: 1s # 初始时间间隔: 默认0
6 interval: 30s # 健康检查时间间隔:默认25s
7# 或者
8
9spring:
10 cloud:
11 loadbalancer:
12 clients:
13 your-service-name: # 或者指定服务名,每个服务名使用不同的配置,未配置的服务名使用默认配置
14 health-check:
15 initial-delay: 5s
16 interval: 30s
关于健康检查机制,在使用服务注册中心的时候,并不需要。因为注册中心会检测微服务的健康状况。
如果是使用简单的服务发现(
SimpleDiscoveryClient
),健康检查是有帮助的。
负载均衡的缓存和统计数据 #
除了健康检查机制外,还可以配置负载均衡的缓存和负载均衡的统计数据:
1spring:
2 cloud:
3 loadbalancer:
4 cache: # 使用caffine缓存
5 caffeine:
6 spec: initialCapacity=10, maximumSize=50 # initialCapacity默认256
7 ttl: 30s # 默认35s
8 stats: # 负载均衡statistics
9 micrometer:
10 enabled: true
默认情况下,负载均衡使用内存作为服务缓存。如果项目引入了caffeine
,Spring-Cloud Load Balancer会使用caffeine
作为服务缓存。
当然,可以通过设置spring.cloud.loadbalancer.cache.enabled
为false
禁用缓存。
此外,还可以通过设置spring.cloud.loadbalancer.stats.micrometer.enabled
为true
来查看负载均衡的调用情况。它通过actuator
注册了几个metrics
:
loadbalancer.requests.active
: 任意微服务实例当前活动的请求;loadbalancer.requests.success
: 负载均衡成功的请求数;loadbalancer.requests.failed
: 负载均衡因异常失败的请求数;loadbalancer.requests.discard
: 负载均衡丢弃的请求数;
以上,是本文关于Spring-Cloud LoadBalancer 的简单使用过程中遇到的几个小问题。
还有一些可自定义的配置还未实践,包括Zone-Based LoadBalancer,Sticky Session LoadBalancer,Hints等等。
References #
- Spring Cloud Load Balancer
- Spring WebClient as a Load Balancer Client
- Spring WebFlux WebClient as a Load Balancer Client
- Load Balancer: No servers available for service ***
- Configuring spring-cloud loadbalancer without autoconfiguration?
- Load-balancer-does-not-contain-an-instance-for-the-service
- Caffeine specs