Spring Cloud 负载均衡解析

2022-06-14 13:28:47

1、负载均衡的自动配置类

Spring Cloud 的负载均衡的自动注入类,注意这个类是spring-cloud-commons 模块下面的,org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,而不是spring-cloud-loadbalancer模块下面的org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,这两个同名

@Configuration(proxyBeanMethods=false)@ConditionalOnClass(RestTemplate.class)@ConditionalOnBean(LoadBalancerClient.class)@EnableConfigurationProperties(LoadBalancerRetryProperties.class)publicclassLoadBalancerAutoConfiguration{@LoadBalanced@Autowired(required=false)privateList<RestTemplate> restTemplates=Collections.emptyList();

这个自动注入依赖 RestTemplate 和 LoadBalancerClient,毕竟负载均衡是为RestTemplate 提供的能力。

类中注入了一个RestTemplate 集合,且用 @LoadBalanced 标注,这个注解只是一个标记注解,被 @Qualifier 注解标注,做了一个过滤,并没有处理@LoadBalanced的逻辑。

@Target({ElementType.FIELD,ElementType.PARAMETER,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Qualifierpublic@interfaceLoadBalanced{}

接下来的代码 生成了 LoadBalancerRequestFactory,loadBalancerInterceptor 和restTemplateCustomizer

@BeanpublicSmartInitializingSingletonloadBalancedRestTemplateInitializerDeprecated(finalObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers){return()-> restTemplateCustomizers.ifAvailable(customizers->{for(RestTemplate restTemplate:LoadBalancerAutoConfiguration.this.restTemplates){for(RestTemplateCustomizer customizer: customizers){
				customizer.customize(restTemplate);}}});}@Bean@ConditionalOnMissingBeanpublicLoadBalancerRequestFactoryloadBalancerRequestFactory(LoadBalancerClient loadBalancerClient){returnnewLoadBalancerRequestFactory(loadBalancerClient,this.transformers);}@Configuration(proxyBeanMethods=false)@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")staticclassLoadBalancerInterceptorConfig{@BeanpublicLoadBalancerInterceptorloadBalancerInterceptor(LoadBalancerClient loadBalancerClient,LoadBalancerRequestFactory requestFactory){returnnewLoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublicRestTemplateCustomizerrestTemplateCustomizer(finalLoadBalancerInterceptor loadBalancerInterceptor){return restTemplate->{List<ClientHttpRequestInterceptor> list=newArrayList<>(
               restTemplate.getInterceptors());
         list.add(loadBalancerInterceptor);
         restTemplate.setInterceptors(list);};}}

上面生成了LoadBalancerInterceptor,并且在restTemplateCustomizer 中调用将LoadBalancerInterceptor设置给restTemplate,LoadBalancerInterceptor用来对http调用进行拦截,上一篇分析restTemplate时,看到过判断interceptor存在是走interceptor的方法。

2、LoadBalancerClient 接口

publicinterfaceLoadBalancerClientextendsServiceInstanceChooser{//用服务实例执行请求<T>Texecute(String serviceId,LoadBalancerRequest<T> request)throwsIOException;<T>Texecute(String serviceId,ServiceInstance serviceInstance,LoadBalancerRequest<T> request)throwsIOException;//构建URI、URIreconstructURI(ServiceInstance instance,URI original);}publicinterfaceServiceInstanceChooser{//根据serviceId 选择服务实例ServiceInstancechoose(String serviceId);}

3、LoadBalancerInterceptor

publicclassLoadBalancerInterceptorimplementsClientHttpRequestInterceptor{privateLoadBalancerClient loadBalancer;privateLoadBalancerRequestFactory requestFactory;publicLoadBalancerInterceptor(LoadBalancerClient loadBalancer,LoadBalancerRequestFactory requestFactory){this.loadBalancer= loadBalancer;this.requestFactory= requestFactory;}publicLoadBalancerInterceptor(LoadBalancerClient loadBalancer){// for backwards compatibilitythis(loadBalancer,newLoadBalancerRequestFactory(loadBalancer));}@OverridepublicClientHttpResponseintercept(finalHttpRequest request,finalbyte[] body,finalClientHttpRequestExecution execution)throwsIOException{finalURI originalUri= request.getURI();String serviceName= originalUri.getHost();Assert.state(serviceName!=null,"Request URI does not contain a valid hostname: "+ originalUri);returnthis.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));}}

LoadBalancerInterceptor 中有LoadBalancerClient ,LoadBalancerRequestFactory在构造LoadBalancerInterceptor 时会设置这两个成员。

当LoadBalancerInterceptor 拦截http请求时,执行intercept 方法,这是执行loadBalancer.execute方法,根据serviceName选择服务实例进行请求。

4、LoadBalancerRequestFactory

publicclassLoadBalancerRequestFactory{privateLoadBalancerClient loadBalancer;privateList<LoadBalancerRequestTransformer> transformers;publicLoadBalancerRequestFactory(LoadBalancerClient loadBalancer,List<LoadBalancerRequestTransformer> transformers){this.loadBalancer= loadBalancer;this.transformers= transformers;}publicLoadBalancerRequestFactory(LoadBalancerClient loadBalancer){this.loadBalancer= loadBalancer;}publicLoadBalancerRequest<ClientHttpResponse>createRequest(finalHttpRequest request,finalbyte[] body,finalClientHttpRequestExecution execution){return instance->{HttpRequest serviceRequest=newServiceRequestWrapper(request, instance,this.loadBalancer);if(this.transformers!=null){for(LoadBalancerRequestTransformer transformer:this.transformers){
               serviceRequest= transformer.transformRequest(serviceRequest,
                     instance);}}return execution.execute(serviceRequest, body);};}}

LoadBalancerRequestFactory会具体执行request请求。

5、接上篇restTemplate,看一下执行流程。

//InterceptingClientHttpRequest.InterceptingRequestExecution#executeprotectedfinalClientHttpResponseexecuteInternal(HttpHeaders headers,byte[] bufferedOutput)throwsIOException{//创建一个拦截器执行器InterceptingRequestExecution requestExecution=newInterceptingRequestExecution();//执行return requestExecution.execute(this, bufferedOutput);}
//org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#executepublicClientHttpResponseexecute(HttpRequest request,byte[] body)throwsIOException{//执行拦截器的方法if(this.iterator.hasNext()){ClientHttpRequestInterceptor nextInterceptor=this.iterator.next();return nextInterceptor.intercept(request, body,this);}else{HttpMethod method= request.getMethod();Assert.state(method!=null,"No standard HTTP method");ClientHttpRequest delegate= requestFactory.createRequest(request.getURI(), method);
         request.getHeaders().forEach((key, value)-> delegate.getHeaders().addAll(key, value));if(body.length>0){if(delegateinstanceofStreamingHttpOutputMessage){StreamingHttpOutputMessage streamingOutputMessage=(StreamingHttpOutputMessage) delegate;
               streamingOutputMessage.setBody(outputStream->StreamUtils.copy(body, outputStream));}else{StreamUtils.copy(body, delegate.getBody());}}return delegate.execute();}}}
  • 作者:喜欢小苹果的码农
  • 原文链接:https://blog.csdn.net/xuwenjingrenca/article/details/125000284
    更新时间:2022-06-14 13:28:47