在spring cloud负载均衡之ribbon—demo实现 我们介绍了Ribbon
的使用方式,其中给出了完整的demo使用方式,这篇文章将介绍RestTemplate
类将介绍与Ribbon
配合工作流程。如果对ribbon的工作原理感兴趣,可以参考spring clound负载均衡之Ribbon(三)- 工作原理 , 这篇文章介绍了Ribbon中重要的类型,以及类型的作用。
使用方式
在正式开始介绍RestTemplate
方式工作方式之前,我们回顾下如何通过RestTemplate
进行远程请求, 具体使用方式如下:
@GetMapping("/info") public String info() { return restTemplate.getForObject("http://SPRING-EUREKA-CLIENT-B/info", String.class); }
在这个简单使用中,我们主要使用了getForObject()
方法,请求远程服务,其中有两个参数需要留意:
- 请求URL: 请求URL中包含了对远程请求的服务名称
- 远程服务返回结果,将封装成对一个的结果对象。本demo中则直接使用String字符串。
工作原理
本篇文章将以getForObject()
作为主要入口,RestTemplate
如何与Ribbon
配合并很好的工作的。我们看下RestTemplate
的类图结构:
通过类图我们可以得知,RestTemplate
与Ribbon
的链接渠道是通过RibbonClientHttpRequestFactory
进行链接,通过返回的ClientHttpRequest
对象将Ribbon
组件进行组装。下面我们以getForObject()
方法为例,跟踪下具体的源码,查看请求是如何组装与最终执行的。
RestTemplate#getForObject()
该方法作为入口,接收请求信息,直接看源码:
@Override @Nullable public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException { // 创建RequstCallback对象,该对象主要对Request进行回调处理,默认为AcceptHeaderRequestCallback RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); // 这里创建对response message的返回值进行处理, 通过对返回类型,以及MessageConverters进行绑定 HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger); // 调用execute方法 return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables); } public <T> RequestCallback acceptHeaderRequestCallback(Class<T> responseType) { return new AcceptHeaderRequestCallback(responseType); }
RestTemplate#execute()
@Override @Nullable public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException { // 这里是将url中的占位符信息,替换为variables中的具体的值, 组成完整的URI信息 URI expanded = getUriTemplateHandler().expand(url, uriVariables); // 执行请求 return doExecute(expanded, method, requestCallback, responseExtractor); }
RestTemplate#doExecute()
这里为正式执行http请求的方法,具体查看源码:
@Nullable protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { // 参数校验 Assert.notNull(url, "URI is required"); Assert.notNull(method, "HttpMethod is required"); ClientHttpResponse response = null; try { // 创建ClientHttpRequest独享 ClientHttpRequest request = createRequest(url, method); // 当RequestCallback不为空时,调用请求回调类, 这里默认为AcceptHeaderRequestCallback if (requestCallback != null) { requestCallback.doWithRequest(request); } // 执行request请求, 并获取response响应对象 response = request.execute(); // 处理response响应数据 handleResponse(url, method, response); // 将请求结果转换为指定的responseType, 并返回 return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } }
在doExecute()
方法中,逻辑是很明确的,主要为创建Request
-> request 回调
-> 发送请求
-> 处理响应结果
-> 类型转换这么一个过程
。
HttpAccessor#createRequest()
在上面类图中可以得知,其实创建Request
对象是由父类来完成的,因此我们看下创建Request
方法的源码:
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { // 这里创建Request对象是通过HttpClientRequestFactory类来实现的 ClientHttpRequest request = getRequestFactory().createRequest(url, method); if (logger.isDebugEnabled()) { logger.debug("HTTP " + method.name() + " " + url); } return request; }
InterceptingHttpAccessor#getRequestFactory
在获取RequestFactory
的时候,其实InterceptingHttpAccessor
类中对HttpClientRequestFactory
类型做了扩展,增加了ClientHttpRequestInterceptor
的扩展,具体逻辑如下:
@Override public ClientHttpRequestFactory getRequestFactory() { // 获取已经注册的interceptors的类型 List<ClientHttpRequestInterceptor> interceptors = getInterceptors(); // 如果interceptors不为空,则改变HttpAccessor中的现有逻辑 if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) { // 创建InterceptingClientHttpRequestFactory对象 factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } } }
该方法中可能会该变现有的逻辑,主要通过判断是否注册了interceptor
,因此当有interceptor
存在时,request请求必须先经过interceptor.intercept()
方法之后,才会最终走到RibbonClientHttpRequestFactory
类中,创建新的request
对象发送请求。这里不做展开,感兴趣的可以自己查看。
RibbonClientHttpRequestFactory#createRequest()
最终HttpRequest
的创建就在RibbonClientHttpRequestFactory
中完成,我们看下源码:
public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException { // 获取uri中的host,因为我们发送请求的时候,使用的为serviceId, 因此这里直接就是serviceId信息 String serviceId = originalUri.getHost(); if (serviceId == null) { throw new IOException( "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]"); } // 从SpringClientFactory中获取IClientConfig bean, 默认为DefaultClientConfigImpl中实现 IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId); // 获取RestClient对象, 默认为OverrideRestClient对象 RestClient client = this.clientFactory.getClient(serviceId, RestClient.class); HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name()); return new RibbonHttpRequest(originalUri, verb, client, clientConfig); }
RibbonHttpRequest#execute()
执行http请求, 查看下具体源码信息:
@Override protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException { try { // 向http中加入headers信息 addHeaders(headers); // 当outputstream已经存在时,则关闭,并读取request中的请求信息 if (outputStream != null) { outputStream.close(); builder.entity(outputStream.toByteArray()); } // 获取HttpRequest对象,对象中包含了URI与Verb信息 HttpRequest request = builder.build(); // 执行请求,与LoadBalancer一起执行 HttpResponse response = client.executeWithLoadBalancer(request, config); return new RibbonHttpResponse(response); } catch (Exception e) { throw new IOException(e); } }
AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
在通过RestClient
调用时,最终都会通过executeWithLoadBalancer()
方法进行预处理,具体源码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // 获取LoadBalancerCommand对象 LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { // 提交并执行command return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { // 根据server重新组装URI信息 URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { // 执行http请求 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } } // 构建LoadBalancerCommand protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) { // 设置retry的操作设置 RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config); // 创建LoadBalancerCommand LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) // 设置LoadBalancerContext .withRetryHandler(handler) // 设置重试 .withLoadBalancerURI(request.getUri()); // 设置uri // 自定义操作 customizeLoadBalancerCommandBuilder(request, config, builder); // 构建Command return builder.build(); }
ZoneAwareLoadBalancer#chooseServer()
因为在Ribbon
中还是采用的响应是编程,因此,构建Observable
对象时,这里就不贴源码了,最终在Request
执行时,则需要从ILoadBalancer
中选择一个合适的Server
,并返回,具体源码如下:
@Override public Server chooseServer(Object key) { // 判断是否启用ZoneAwareLoadBalancer, 或则当前可用的zone只有一个时,直接从当前zone选择server if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { // 获取stats状态信息 LoadBalancerStats lbStats = getLoadBalancerStats(); // 创建缓存数据快照 Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } // 从快照中获取可用的快照列表 Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); // 可用快照存在时 if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { // 随机选择一个可用的zone String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { // 获取区域的LoadBalancer对象 BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); // 从当前区域中选择Server server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { // 当选择Server失败时,重新选择Server logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }
BaseLoadBalancer#chooseServer()
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { // 当IRule存在时,从rule中选取Server信息,当前默认为: ZoneAvoidanceRule return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
PredicateBaseRule#choose()
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); // 1. 从LoadBalancer中获取所有的Server列表 // 2. 通过Predicate选择Server Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
AbstractServerPredicate#chooseRoundRobinAfterFiltering()
在该方法中,其实就是一个RoundRobin
的算法,轮询的请求后端Server
. 具体源码如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { // 获取可以使用的Server列表 List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } // 根据RoundRobin算法计算对应的index, 并获取Server return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }
RibbonClientConfiguration.OverrideRestClient#reconstructURIWithServer()
该方法内部主要是为了将请求地址更新到安全的链接,更新安全链接的前提是IClientConfig
将请求链接标记为安全链接时使用,具体源码如下:
@Override public URI reconstructURIWithServer(Server server, URI original) { // 判断是否为安全的链接,如果为安全链接在,则更新schema URI uri = updateToSecureConnectionIfNeeded(original, this.config, this.serverIntrospector, server); // 重新组装URI return super.reconstructURIWithServer(server, uri); }
LoadBalancerContext#reconstructURIWithServer()
public URI reconstructURIWithServer(Server server, URI original) { String host = server.getHost(); int port = server.getPort(); String scheme = server.getScheme(); if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) { return original; } if (scheme == null) { scheme = original.getScheme(); } if (scheme == null) { scheme = deriveSchemeAndPortFromPartialUri(original).first(); } try { StringBuilder sb = new StringBuilder(); sb.append(scheme).append("://"); if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { sb.append(original.getRawUserInfo()).append("@"); } sb.append(host); if (port >= 0) { sb.append(":").append(port); } sb.append(original.getRawPath()); if (!Strings.isNullOrEmpty(original.getRawQuery())) { sb.append("?").append(original.getRawQuery()); } if (!Strings.isNullOrEmpty(original.getRawFragment())) { sb.append("#").append(original.getRawFragment()); } URI newURI = new URI(sb.toString()); return newURI; } catch (URISyntaxException e) { throw new RuntimeException(e); } }
该方法其实很简单,主要是为了根据Server
的host
以及端口组装成新的URI并返回。
RestClient#execute()
最终Request中还是依赖于RestClient对象来实现,具体源码如下:
private HttpResponse execute(HttpRequest.Verb verb, URI uri, Map<String, Collection<String>> headers, Map<String, Collection<String>> params, IClientConfig overriddenClientConfig, Object requestEntity) throws Exception { HttpClientResponse thisResponse = null; boolean bbFollowRedirects = bFollowRedirects; // 判断是否接收redirects请求 if (overriddenClientConfig != null // set whether we should auto follow redirects && overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects)!=null){ // use default directive from overall config Boolean followRedirects = Boolean.valueOf(""+overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects, bFollowRedirects)); bbFollowRedirects = followRedirects.booleanValue(); } restClient.setFollowRedirects(bbFollowRedirects); if (logger.isDebugEnabled()) { logger.debug("RestClient sending new Request(" + verb + ": ) " + uri); } // 获取WebResource资源 WebResource xResource = restClient.resource(uri.toString()); // 请求参数 if (params != null) { for (Map.Entry<String, Collection<String>> entry: params.entrySet()) { String name = entry.getKey(); for (String value: entry.getValue()) { xResource = xResource.queryParam(name, value); } } } ClientResponse jerseyResponse; Builder b = xResource.getRequestBuilder(); if (headers != null) { for (Map.Entry<String, Collection<String>> entry: headers.entrySet()) { String name = entry.getKey(); for (String value: entry.getValue()) { b = b.header(name, value); } } } Object entity = requestEntity; switch (verb) { case GET: // 发送请求 jerseyResponse = b.get(ClientResponse.class); break; case POST: jerseyResponse = b.post(ClientResponse.class, entity); break; case PUT: jerseyResponse = b.put(ClientResponse.class, entity); break; case DELETE: jerseyResponse = b.delete(ClientResponse.class); break; case HEAD: jerseyResponse = b.head(); break; case OPTIONS: jerseyResponse = b.options(ClientResponse.class); break; default: throw new ClientException( ClientException.ErrorType.GENERAL, "You have to one of the REST verbs such as GET, POST etc."); } // 将结果封装成为HttpClientResponse thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig); if (thisResponse.getStatus() == 503){ thisResponse.close(); throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED); } return thisResponse; }
关于RestTemplate
相关的源码就到这里,后面江湖补充RestTemplate
请求的序列图,以便于更好的理解。
文章评论