Spring WebClient实现微服务的负载均衡调用

Spring WebClient实现微服务的负载均衡调用

在微服务应用中,负载均衡来保证应用的可用性的常用手段。Spring-Cloud-LoadBalancer提供了服务之间实现负载均衡调用的能力。

除了使用 Open-Feign外,还可以使用RestTemplateWebClint进行服务调用。

WebClientSpring WebFlux项目下的HTTP工具,基于Reactor的非阻塞流式API,是RestTemplate的有效替代。

Spring Cloud 负载均衡 #

Spring Cloud提供了客户端负载均衡的抽象和实现。如果你的微服务引入了spring-cloud-starter-loadbalancer包,并且没有在配置文件中使用spring.cloud.loadbalancer.enabled = false禁用负载均衡,那么Spring Cloud负载均衡的必需项会在应用启动后自动配置。

默认情况下,Spring Cloud LoadBalancer会实现ReactiveLoadBalancer接口,这个接口基于轮询(Round-Robin)来随机访问微服务实例。而微服务示例由谁来提供呢?

很明显,自然是由 服务发现来提供。

那么,服务发现与负载均衡之间沟通的桥梁是什么?这就不得不提ServiceInstanceListSupplier这个接口了,这个接口使服务发现客户端通过service-id获取服务实例。

ServiceInstanceListSupplier接口提供了builder()方法来创建实例。

 1public interface ServiceInstanceListSupplier 
 2    extends Supplier<Flux<List<ServiceInstance>>> {
 3
 4	String getServiceId();
 5	default Flux<List<ServiceInstance>> get(Request request) {
 6		return get();
 7	}
 8	static ServiceInstanceListSupplierBuilder builder() {
 9		return new ServiceInstanceListSupplierBuilder();
10	}
11}

默认情况下,如果微服务引入了spring-boot-starter-webflux(当然啦,都使用WebClient了),Spring Cloud会自动配置ReactiveCompositeDiscoveryClient这个服务发现客户端:

1public class ReactiveCompositeDiscoveryClientAutoConfiguration {
2    @Bean
3    @Primary
4    public ReactiveCompositeDiscoveryClient reactiveCompositeDiscoveryClient(
5            List<ReactiveDiscoveryClient> discoveryClients) {
6        return new ReactiveCompositeDiscoveryClient(discoveryClients);
7    }
8}

如果使用Eureka作为服务发现实现,那么上述配置所使用的形参discoveryClients则通过

 1public class EurekaReactiveDiscoveryClientConfiguration {
 2
 3    @Bean
 4    @ConditionalOnMissingBean
 5    public EurekaReactiveDiscoveryClient eurekaReactiveDiscoveryClient
 6        (EurekaClient client, EurekaClientConfig clientConfig) 
 7    {
 8        return new EurekaReactiveDiscoveryClient(client, clientConfig);
 9    }
10    // 以下省略...
11}

同样地,配置依赖的EurekaClientEurekaClientConfig也是自动配置,这里不在展开讨论。

有了服务发现,还需要配置负载均衡,Spring默认情况下也进行了自动配置。

首先是通过 LoadBalancerAutoConfiguration配置负载均衡客户端工厂LoadBalancerClientFactory

 1// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
 2@ConditionalOnMissingBean
 3@Bean
 4public LoadBalancerClientFactory loadBalancerClientFactory
 5    (LoadBalancerClientsProperties properties) 
 6{    
 7    LoadBalancerClientFactory clientFactory 
 8        = new LoadBalancerClientFactory(properties);
 9    clientFactory.setConfigurations(
10        this.configurations.getIfAvailable(Collections::emptyList));
11    return clientFactory;
12}

而负载均衡的客户端则是在LoadBalancerClientConfiguration类中进行配置:

 1// org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
 2@Bean
 3@ConditionalOnMissingBean
 4public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
 5        LoadBalancerClientFactory loadBalancerClientFactory) {
 6    String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
 7    return new RoundRobinLoadBalancer(
 8            loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
 9}
10
11@Bean
12@ConditionalOnBean(ReactiveDiscoveryClient.class)
13@ConditionalOnMissingBean
14@Conditional(DefaultConfigurationCondition.class)
15public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
16        ConfigurableApplicationContext context) {
17    return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
18}
19
20// 以下省略...

默认情况下,Spring-Cloud使用RoundRobinLoadBalancer进行服务轮询。同时,还配置了ServiceInstanceListSupplier,使用了基础的发现客户端和缓存。

 1// org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplierBuilder
 2public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
 3    if (baseCreator != null && LOG.isWarnEnabled()) {
 4        LOG.warn("Overriding a previously set baseCreator with a ReactiveDiscoveryClient baseCreator.");
 5    }
 6    this.baseCreator = context -> {
 7        ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
 8
 9        return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
10    };
11    return this;
12}
13
14public ServiceInstanceListSupplierBuilder withCaching() {
15    if (cachingCreator != null && LOG.isWarnEnabled()) {
16        LOG.warn(
17                "Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
18    }
19    this.cachingCreator = (context, delegate) -> {
20        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
21                .getBeanProvider(LoadBalancerCacheManager.class);
22        if (cacheManagerProvider.getIfAvailable() != null) {
23            return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
24        }
25        if (LOG.isWarnEnabled()) {
26            LOG.warn("LoadBalancerCacheManager not available, returning delegate without caching.");
27        }
28        return delegate;
29    };
30    return this;
31}

WebClient作为负载均衡客户端 #

使用WebClient作为负载均衡客户端非常简单,只需要使用@LoadBalanced注解即可:

 1@Configuration
 2public class MyConfiguration {
 3
 4    @Bean
 5    @LoadBalanced
 6    public WebClient.Builder loadBalancedWebClientBuilder() {
 7        return WebClient.builder();
 8    }
 9}
10
11public class MyClass {
12    @Autowired
13    private WebClient.Builder webClientBuilder;
14
15    public Mono<String> doOtherStuff() {
16        return webClientBuilder.build().get().uri("http://stores/stores")
17                        .retrieve().bodyToMono(String.class);
18    }
19}

需要注意的是,在使用负载均衡后,WebClient发起调用的uri必需使用注册中心的服务名,而不能使用host:port的组合了。

上文已经提到,默认情况下,引入spring-cloud-starter-loadbalancer包后,Spring Cloud Load Balancer已经开始工作,ReactiveLoadBalancer已经自动配置并且在工作了。

通常,在微服务调用时,还需要处理请求之间头的传递问题。这需要在请求中加入一个过滤器,用来处理这个问题。

WebClient请求中加入过滤器添加请求头信息也很容易,只需要实现ExchangeFilterFunction接口:

 1public class UserContextWebClientFilter implements ExchangeFilterFunction {
 2    @Override
 3    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
 4        ClientRequest buildRequest = ClientRequest.from(request).headers(h -> {
 5            h.add(UserContext.CORRELATION_ID, UserContextHolder.getContext().getCorrelationId());
 6            h.add(UserContext.AUTH_TOKEN, UserContextHolder.getContext().getAuthToken());
 7        }).build();
 8        return next.exchange(buildRequest);
 9    }
10}

然后在配置WebClient时,添加过滤器即可:

 1@Configuration
 2public class WebClientLoadBalancerConfiguration {
 3
 4    @Bean
 5    @LoadBalanced
 6    public WebClient.Builder loadBalancedWebClient() {
 7        return WebClient.builder()
 8            .filters(
 9                f -> {
10                    f.add(new UserContextWebClientFilter());
11                });
12    }
13}

自定义负载均衡配置 #

上文提到,默认情况下,Spring-Cloud会自动配置RoundRobinLoadBalancerServiceInstanceListSupplier,如果想自定义负载均衡配置,可以手动配置这两个类。

使用RamdomLoadBalancer而非RoundRobinLoadBalancer #

 1public class CustomLoadBalancerConfiguration {
 2
 3    @Bean
 4    ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
 5            LoadBalancerClientFactory loadBalancerClientFactory) {
 6        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
 7        return new RandomLoadBalancer(loadBalancerClientFactory
 8                .getLazyProvider(name, ServiceInstanceListSupplier.class),
 9                name);
10    }
11}

LoadBalancerClientFactory前文提过,是通过LoadBalancerAutoConfiguration自动配置的。

自定义ServiceInstanceListSupplier #

除了自定义RamdomLoadBalancer外,还可以手动配置ServiceInstanceListSupplier

 1public class CustomLoadBalancerConfiguration {
 2
 3    @Bean
 4    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
 5            ConfigurableApplicationContext context) {
 6        return ServiceInstanceListSupplier.builder()
 7                    .withDiscoveryClient()
 8                    .withHealthChecks()     // 使用健康检查
 9                    .build(context);
10        }
11    }

使用自定义配置 #

自定义配置类不能使用@Configuration注解。

https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#custom-loadbalancer-configuration

要使用手动配置注解,可以使用@LoadBalancerClient注解。

 1@Configuration
 2@LoadBalancerClient(value = "licensing-service", configuration = CustomLoadBalancerConfig.class)
 3public class WebClientLoadBalancerConfiguration {
 4
 5    @Bean
 6    @LoadBalanced
 7    public WebClient.Builder loadBalancedWebClient() {
 8        return WebClient.builder()
 9            .filters(
10                filters -> {
11                    filters.add(new UserContextWebClientFilter());
12                });
13    }
14}

No servers available for service *** #

如上,在使用自定义配置的情况下,若在自定义配置中使用健康检查的情况下,可能会遇到负载均衡无法找到服务的问题:

1ServiceInstanceListSupplier.builder()
2                    .withDiscoveryClient()
3                    .withHealthChecks()     // 使用健康检查
4                    .build(context);
1ERROR 88657 --- [nio-8888-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN ] with root cause
2WARN 88657 --- [     parallel-5] o.s.c.l.core.RoundRobinLoadBalancer      : No servers available for service: 192.168.1.70
3WARN 88657 --- [     parallel-5] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70
4WARN 88657 --- [     parallel-9] o.s.c.l.core.RoundRobinLoadBalancer      : No servers available for service: 192.168.1.70
5WARN 88657 --- [     parallel-9] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70
6WARN 88657 --- [    parallel-12] o.s.c.l.core.RoundRobinLoadBalancer      : No servers available for service: 192.168.1.70
7WARN 88657 --- [    parallel-12] eactorLoadBalancerExchangeFilterFunction : LoadBalancer does not contain an instance for the service 192.168.1.70

原因在于,使用同一个WebClient.Builder实例同时用来处理请求和发送健康健康检查,因此,HealthCheckServiceInstanceListSupplier用来健康检查的的请求是由负载均衡客户端发出的。实际上,健康请求应该是非负载均衡的(因健康检查使用的是host,而不是服务id,从上面的警告日志可看出)。因此,可以初拥独立的非负载均衡的客户端用来健康检查。像这样:

1ServiceInstanceListSupplier.builder()
2                    .withDiscoveryClient()
3                    .withHealthChecks(WebClient.builder().build())     // 使用非负载均衡的健康检查
4                    .build(context);

另外,还可以通过配置自定义健康检查机制:

 1spring: 
 2  cloud:
 3    loadbalancer:
 4      health-check: 
 5        initial-delay: 1s   # 初始时间间隔: 默认0
 6        interval: 30s  # 健康检查时间间隔:默认25s
 7# 或者
 8
 9spring:
10    cloud:
11        loadbalancer:
12            clients:
13                your-service-name: # 或者指定服务名,每个服务名使用不同的配置,未配置的服务名使用默认配置
14                    health-check:
15                        initial-delay: 5s
16                        interval: 30s

关于健康检查机制,在使用服务注册中心的时候,并不需要。因为注册中心会检测微服务的健康状况。

如果是使用简单的服务发现(SimpleDiscoveryClient),健康检查是有帮助的。

负载均衡的缓存和统计数据 #

除了健康检查机制外,还可以配置负载均衡的缓存和负载均衡的统计数据:

 1spring: 
 2  cloud:
 3    loadbalancer:
 4      cache: # 使用caffine缓存
 5        caffeine:
 6          spec: initialCapacity=10, maximumSize=50 # initialCapacity默认256
 7        ttl: 30s # 默认35s
 8      stats:  # 负载均衡statistics
 9        micrometer: 
10          enabled: true

默认情况下,负载均衡使用内存作为服务缓存。如果项目引入了caffeine,Spring-Cloud Load Balancer会使用caffeine作为服务缓存。

当然,可以通过设置spring.cloud.loadbalancer.cache.enabledfalse禁用缓存。

此外,还可以通过设置spring.cloud.loadbalancer.stats.micrometer.enabledtrue来查看负载均衡的调用情况。它通过actuator注册了几个metrics

  • loadbalancer.requests.active: 任意微服务实例当前活动的请求;

  • loadbalancer.requests.success: 负载均衡成功的请求数;

  • loadbalancer.requests.failed: 负载均衡因异常失败的请求数;

  • loadbalancer.requests.discard: 负载均衡丢弃的请求数;

以上,是本文关于Spring-Cloud LoadBalancer 的简单使用过程中遇到的几个小问题。

还有一些可自定义的配置还未实践,包括Zone-Based LoadBalancer,Sticky Session LoadBalancer,Hints等等。

References #