在上篇文章spring cloud 服务发现之Eureka Client(二)—自动装配中,介绍了Eureka Client自动装配的过程,其中设计到几个比较重要的类,包括ServiceRegistry
, EurekaClient
, EurekaDiscoveryClient
类型。这些类型在Eureka Client
整个生命周期中充当这比较重要的角色,在今天这章节中,将主要介绍DiscoveryClient
对象,该对象主要包括了一下几个步骤:
- 注册当前实例
- 从eureka server中获取实例列表
- 心跳启动
类结构设计
在Spring Cloud中,使用了EurekaDiscoveryClient
作为DiscoveryClient
的实现,该类型的设计如下:
spring cloud在做封装的时候,是提取了单独一个DicoveryClient
类,作为通用的DicoveryClient
客户端来处理。这里在看源码的时候,会产生一个误区,需要多加注意。通过上面类图可以看出,spring cloud在原有的netflix的DicoveryClient
基础之上,封装了CloudEurekaClient
客户端,用于屏蔽对DicoveryClient
使用的细节。
实例化CloudEurekaClient
对EurekaClient
的使用,首先需要对EurekaClient
的实例化,实例化通过CloudEurekaClient()
构造函数进行,源码如下:
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); }
该构造函数声明了当前类型的依赖关系,需要创建:
ApplicationInfoManager
对象EurekaClientConfig
对象,在spring中实际使用的EurekaClientConfigBean
对象AbstractDiscoveryOptionalArgs
对象,该对象的最终使用的是MutableDiscoveryClientOptionalArgs
对象ApplicationEventPublisher
对象,该对象由AbstractApplicationContext
对象创建,并加入到容器
DiscoveryClient实例化
通过对CloudEurekaClient
实例化过程中,会实例化父类DicoveryClient
对象,具体源码如下:
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) { this(applicationInfoManager, config, args, ResolverUtils::randomize); } public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) { this(applicationInfoManager, config, args, new Provider<BackupRegistry>() { private volatile BackupRegistry backupRegistryInstance; @Override public synchronized BackupRegistry get() { if (backupRegistryInstance == null) { String backupRegistryClassName = config.getBackupRegistryImpl(); if (null != backupRegistryClassName) { try { backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance(); logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass()); } catch (InstantiationException e) { logger.error("Error instantiating BackupRegistry.", e); } catch (IllegalAccessException e) { logger.error("Error instantiating BackupRegistry.", e); } catch (ClassNotFoundException e) { logger.error("Error instantiating BackupRegistry.", e); } } if (backupRegistryInstance == null) { logger.warn("Using default backup registry implementation which does not do anything."); backupRegistryInstance = new NotImplementedRegistryImpl(); } } return backupRegistryInstance; } }, randomizer); } DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 判断args参数是否为空, 在spring cloud环境中,使用的是MutableDiscoveryClientOptionalArgs对象 if (args != null) { // 健康检查处理类 this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; // 健康检查回调 this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; // event监听事件列表 this.eventListeners.addAll(args.getEventListeners()); // 实例注册前置处理器 this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; // 获取当前实例InstanceInfo信息 InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); // 本地region applications缓存 localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); // 获取当前需要fetch的regions列表, 通过eureka.client.featch-remote-regions-registry remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); // 多个fetch regsion采用,分割,在这里将会通过split的方式获取一个数组 remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); // 判断当前EurekaClient是否需要从eureka server 获取注册的实例列表, 通过eureka.client.fetch-registry配置,默认值为true if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } // 判断当前EurekaClient需要将当前实例注册到Eureka Server, 通过eureka.client.register-with-eureka 配置,默认值为true if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 该条件用于判断是否需要将当前实例注册到eureka server服务,并且是否需要从eureka server上获取实例列表 // 当我们在启动Eureka Server的时候, 这两项我们都设置为false if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; // 实例regsion checker对象 instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient // DiscoveryManager是一个单例,用于保存当前EurekaClient对象,以及与Client相关的Config信息 DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh // 当Client 中配置设置为 eureka.client.register-with-eureka 或者 eureka.client.fetch-registry 为true时,就会初始化定时任务 // scheduler为线程池, 主要为DicoveryClient相关, 并且在线程池中的所有线程都是为守护线程,当主线程stop后,对应的任务前程也将会退出 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 初始化DicoveryClient Heartbeat线程池, 线程线程为1 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 缓存刷新线程池初始化,用于处理本地实例刷新操作。 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); // 该方法主要初始化EurekaHttpClients对象 scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; // 判断当前eureka是否通过dns的方式从EurekaServer同步实例列表, 通过eureka.client.use-dns-for-featching-service-urls配置,默认值为false if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } // instance regsion checker对象,同步实例的regsion信息 instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 判断是否需要从eureka server同步实例信息, 如果需要,则从eureka server同步,并保存applications信息到本地 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 当前有两个判断条件, 第一个为当前实例是否需要注册实例信息到eureka server // 第二个为在初始化过程中是否需要注册, 通过 eureka.client.should-enforce-registration-at-init配置,默认为false if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // 注册实例信息 if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch // 启动定时任务, 包括了刷新缓存,心跳信息等 initScheduledTasks(); try { // 暴露监控信息 Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient // 保存全局EurekaClient对象 DiscoveryManager.getInstance().setDiscoveryClient(this); // 保存全局的eureka client config对象 DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
scheduleServerEndpointTask()
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) { // 通过args获取filter列表 Collection<?> additionalFilters = args == null ? Collections.emptyList() : args.additionalFilters; // 获取eurekaJerseyClient对象 EurekaJerseyClient providedJerseyClient = args == null ? null : args.eurekaJerseyClient; TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { argsTransportClientFactories = args.getTransportClientFactories(); } // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; // 是否使用ssl context Optional<SSLContext> sslContext = args == null ? Optional.empty() : args.getSSLContext(); // host name 校验器 Optional<HostnameVerifier> hostnameVerifier = args == null ? Optional.empty() : args.getHostnameVerifier(); // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity // transport client 工厂对象获取 eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); // application source 对象创建 ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() { @Override public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) { long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit); long delay = getLastSuccessfulRegistryFetchTimePeriod(); if (delay > thresholdInMs) { logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}", thresholdInMs, delay); return null; } else { return localRegionApps.get(); } } }; // 通过EurekaHttpClients创建bootstrapResolver对象 eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource, endpointRandomizer ); // 如果需要将当前实例注册到eureka server, 执行该段逻辑 if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { // 创建regsitration client factory对象 newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); // 获取registration Eureka http client对象 newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } // 将创建的对象保存到eureka transport中 eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) // 判断是否需要从eureka 获取注册实例列表 if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { // 创建query http client工厂类 newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource, endpointRandomizer ); // 创建query http client对象 newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } // 将数据保存到transport中 eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
这个方法主要是填充EurekaTransport
对象,通过EurekaClientFacotry
对象创建EurekaHttpClient
对象,在EurekaClient
中,将注册和创建的客户端分开保存,便于后面使用.
fetchRegistry()
该方法主要从eureka server中fetch全量的注册信息, 具体源码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { // 开启计时器 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications // 获取所有的applications, 第一次启动时,该值为空 Applications applications = getApplications(); // 该处有几个判断条件: // 1. 判断是否禁用了delta,根据注释, 开启将会降低客户端fetch数据的频率,默认值为false. 可以根据eureka.client.disable-delta配置 // 2. 判断是否根据VIP地址刷新, 默认值为NULL // 3. 是否强制执行全量同步, 在应用第一次启动时,为false // 4. applications对象是否为null, 在DiscoveryClient初始化的时候,就已经默认包含了Applications对象 // 5. applications中已经注册的applicaions的数量是否为0, 在初次启动的时候,该值为0, 因此该条件为true if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); // 获取并存储全量的注册信息 getAndStoreFullRegistry(); } else { // 获取并更新delta列表 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 发送缓存更新的Event事件,该事件主要是在netflix内部进行处理, 通过EurekaEventListener处理,而该listener的配置,则是放在args中进行配置的 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 尝试更新当前instance在EurekaServer上的状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
getAndStoreFullRegistry()
该方法主要是从eureka server上同步全量的注册信息, 具体代码如下:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // 判断当前EurekaClient是否通过VIP方式获取远程列表, 如果不是,则通过getApplications()方法获取远程注册应用列表 // 如果采用VIP的方式, 则通过getVIP()方法获取远程应用列表 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); // 如果通过http访问返回的结果为200, 则获取applications对象信息 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); // 通过cas的方式更新当前fetch次数,如果更新成功,则将获取到的applications存入本地缓存中 } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
onCachedRefreshed()
当从Eureka Server中同步完成Applicaions列表之后, 则发送CacheRefreshedEvent
事件, 让EurekaEventListener
事件能够有机会执行, 具体代码如下:
protected void onCacheRefreshed() { fireEvent(new CacheRefreshedEvent()); } /** * Send the given event on the EventBus if one is available * * @param event the event to send on the eventBus */ protected void fireEvent(final EurekaEvent event) { for (EurekaEventListener listener : eventListeners) { try { listener.onEvent(event); } catch (Exception e) { logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage()); } } }
这里需要注意的是,这里的事件并不是spring cloud内部的事件通知机制,而是由netflix提供的事件机制,因此,这里不能混淆。
updateInstanceRemoteStatus()
更新当前实例的远程实例状态信息, 具体代码如下:
private synchronized void updateInstanceRemoteStatus() { // Determine this instance's status for this app and set to UNKNOWN if not found InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null; if (instanceInfo.getAppName() != null) { // 从本地缓存中获取当前appName对应的远程实例信息 Application app = getApplication(instanceInfo.getAppName()); if (app != null) { // 判断当前实例是否包含了当前启动的instance信息 InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId()); if (remoteInstanceInfo != null) { // 如果包含, 则将当前当前远程实例状态与远程实例的状态保持一致 currentRemoteInstanceStatus = remoteInstanceInfo.getStatus(); } } } // 如果远程实例不存在,则将当前远实例状态重置为UNKOWN if (currentRemoteInstanceStatus == null) { currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; } // Notify if status changed // 因为实例在启动过程中,状态为UNKOWN,如果在启动过程中,实例的状态发生了改变, 则需要发送通知,告知状态变更 if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { // 内部发送StatusChangeEvent状态通知消息 onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); // 更新当前内部的远程实例状态 lastRemoteInstanceStatus = currentRemoteInstanceStatus; } }
register()
该方法作为Client端启动中可选择的一环,实际上是将当前实例注册到Eureka Server中,这是该方法的调用默认为关闭状态, 该方法在实例化中被调用需要满足两个条件:
eureka.client.register-with-eureka
配置值为true
, 该配置默认开启eureka.client.should-enforce-registration-at-init
配置值为true
, 该配置默认为false
其实在spring-cloud中,默认是不建议在实例化过程中注册实例,我觉得最主要的原因为在实例化过程中,容器并没有完全准备好,还无法对外提供服务。此时注册上去,可能会导致依赖服务调用失败的情况发生。
具体代码如下:
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { // 通过EurekaHttpClient对象,将instance信息注册到EurekaServer服务中 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
initScheduledTasks()
在之前的流程中,介绍到了初始化了三个重要的线程池,用于执行定时任务,该方法则主要是用于初始化所有的定时任务,具体执行代码如下:
private void initScheduledTasks() { // 当开启fetch之后, 则需要创建定时fetch的定时任务, 通过 eureka.client.fetch.registry 来开启 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 该配置主要配置了缓存的更新间隔时间, 通过eureka.client.regsitry-fetch-interval-seconds来配置,默认值为30秒 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); // 该配置用于设置在fetch操作发生失败后, 最大的重试时间, 该值通过eureka.client.cache-refresh-executor-exponential-back-off-bound配置,默认时间为10秒 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 创建TimedSupervisorTask任务,并提交到线程池调用,默认每隔30秒执行一次 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 判断是否开启注册instance, 通过eureka.client.register-with-eureka, 默认值为true if (clientConfig.shouldRegisterWithEureka()) { // 从instance中获取续期时间间隔, 默认续期时间为 30秒 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 获取当续期失败后,重试的最大时间间隔,默认值为10秒, 可以通过eureka.client.heartbeat-executor-exponential-back-off-bound配置 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer // 开启心跳任务,默认每隔30秒执行一次 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator // 该类主要为创建单个线程,同步当前实例的状态到eureka server服务端 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 创建 EventListener对象,用于处理instance状态变换时间信息 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { // 当前listener用于处理instance状态变更的时间通知, 前面主要是做状态的输出, 最主要的还是调用replicator的onDemandUpdate方法 if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; // 该配置用于判断是否开启本地状态的更新,通过ApplicationInfoManager触发register/update操作 // 该配置默认处于开启状态,即默认值为true; 可以通过eureka.client.on-demand-update-status-change配置更改 if (clientConfig.shouldOnDemandUpdateStatusChange()) { // 当开始配置后,statusChangeListener对象将会被注册到ApplicationInfoManager中, 用于处理StatusChangeEvent 时间 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 开启regsiter与update同步任务, 保证eureka 服务端的任务为最新状态 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
InstanceInfoReplicator
在初始化过程中,可以看到,注册instance信息以及更新信息都是通过InstanceInfoReplicator
完成的,构造代码如下:
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; // 创建线程池, 核心线程为1 this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build()); // 保存任务Future this.scheduledPeriodicRef = new AtomicReference<Future>(); // 当前任务状态 this.started = new AtomicBoolean(false); // 限流的实现 this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); // 同步间隔时间, 默认为30秒 this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; // 计算每分钟能够同步多少次, 默认burstSize = 2, 则每分钟能够同步4次 this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); }
在DiscoveryClient
同步的过程中, 该任务的任务则是通过start()
方法启动,对应源码为:
// 开始方法会传入一个参数, 用于判断延迟多久执行当前任务, 该延迟时间,我任务应当和容器的启动时间保持一致, 避免容器处于不可用状态 // 该值通过eurek.client.initial-instance-info-replication-interval-seconds配置,默认值为40秒 public void start(int initialDelayMs) { // 开始方法只会被执行一次, 通过cas方法将当前任务状态设置为started状态 if (started.compareAndSet(false, true)) { // 将当前instance状态设置为dirty状态, 比记录dirty时间值 instanceInfo.setIsDirty(); // for initial register // 调用执行任务, 延迟40秒执行 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); // 绑定当前执行任务 scheduledPeriodicRef.set(next); } }
通过以上方法可以得知, 当前类型也作为任务本身执行,因此查看run
方法, 具体源码如下:
public void run() { try { // 刷新当前instance info的信息, 该方法主要判断instance info的信息是否发生变化, 如果发生变化, 则instance info将会标记为dirty状态 discoveryClient.refreshInstanceInfo(); // 判断当前的instance是否被标记为dirty状态, 如果为dirty, 则返回dirty的时间 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 当instance被标记为dirty之后, 则重新将当前的instance注册到eureka server服务中 discoveryClient.register(); // 取消instance的dirty状态 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 重新执行当前任务,默认每隔30秒钟,执行,并绑定当前执行任务future对象 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
当我们在上面代码分析中得知, 当instance
的状态发生变更的时候,会发送一个StatusChangeEvent
的事件信息, 当在处理事件时,实际上是调用了onDemanUpdate()
方法,具体代码如下:
public boolean onDemandUpdate() { // 该主要是为了限流,防止过多的请求 if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 判断线程池是否处于shutdown状态 if (!scheduler.isShutdown()) { // 提交一个任务执行 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); // 获取当前绑定的任务Future对象 Future latestPeriodic = scheduledPeriodicRef.get(); // 如果对应的future没有执行完成时,取消当前任务. 注意当前任务不会打断线程执行 if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 重新执行run方法,开启新的任务 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }
EurekaAutoServiceRegistration
在启动过程中比较重要的一个类就是EurekaAutoServiceRegistration
类,该类主要维护了InstanceInfo
在启动过程中的状态变更, 以及能够及时将状态变更同步到EurekaServer
, 类型依赖关系如下:
在DiscoveryClient
启动过程中,伴随着InstanceInfo
状态的变化,状态变更流程如下:
EurekaAutoServiceRegistration
完成的就是STARTING -> UP
这一步的状态流转, 通过以上类图可以看出, EurekaAutoServiceRegistration
实现了SmartLifecyle
类,因此在Spring容器化启动过程中,start()
方法将会被回调, 因此我们查看该类的start()
方法源码:
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 // 在初始化过程中,该端口默认值为0, 如果不为0, 则将非安全端口或者安全端口进行绑定 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below // 当前registration状态没有处于running状态,并且nonSecurePort端口>0时, 条件才满足 if (!this.running.get() && this.registration.getNonSecurePort() > 0) { // 调用ServiceRegistry register方法,闯入当前的EurekaRegistration对象 this.serviceRegistry.register(this.registration); // 发送InstanceRegisteredEvent时间 this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); // 将当前的状态标记为running状态 this.running.set(true); } }
ServiceRegistry
当前ServiceRegistry
类主要负责InstanceInfo
的状态变更,以及eureka http client对象的初始化,并注册HealthCheckerHandler
对象. 具体源码如下:
public void register(EurekaRegistration reg) { // 该方法用于初始化EurekaHttpClient对象 maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } // 更新ApplicationInfoManager中instance状态为初始状态 // 这个地方主要依赖了CloudEurekaInstanceConfig对象,在初始化状态InitialStatus状态为UP状态 reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); // 注册HealthCheckHandler对象,处理与HealthCheck有关的信息 reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg .getEurekaClient().registerHealthCheck(healthCheckHandler)); }
在register方法中,会直接更新ApplicationInfoManager
中的InstanceStatus
, 更新instance status之后,会触发StatusChangeEvent
时间,这里就与DiscoveryClient
中初始化的InstanceInfoReplicator
相对应。能够及时处理instance 状态变更。
至此,整个Eureka Client实例化流程整体梳理完毕,后面将给出比较完整的流程图.
总结
在Eurek Client初始化过程中,会初始化相关很多相关信息,spring cloud在这基础上做了很多的扩展,包括了服务状态、状态变更、心跳机制、实例同步任务等相关。流程还是比较长,需要比较长的时间进行梳理。以下给出了主要Discovery Client
相关的主要流程图,希望可以帮到大家:
文章评论