spring cloud 服务发现之Eureka Client(三)—DiscoveryClient

7 9 月, 2021 168点热度 0人点赞 0条评论

在上篇文章spring cloud 服务发现之Eureka Client(二)—自动装配中,介绍了Eureka Client自动装配的过程,其中设计到几个比较重要的类,包括ServiceRegistry, EurekaClient, EurekaDiscoveryClient类型。这些类型在Eureka Client整个生命周期中充当这比较重要的角色,在今天这章节中,将主要介绍DiscoveryClient对象,该对象主要包括了一下几个步骤:

  • 注册当前实例
  • 从eureka server中获取实例列表
  • 心跳启动

类结构设计

在Spring Cloud中,使用了EurekaDiscoveryClient作为DiscoveryClient的实现,该类型的设计如下:

spring cloud eureka

spring cloud在做封装的时候,是提取了单独一个DicoveryClient类,作为通用的DicoveryClient客户端来处理。这里在看源码的时候,会产生一个误区,需要多加注意。通过上面类图可以看出,spring cloud在原有的netflix的DicoveryClient基础之上,封装了CloudEurekaClient客户端,用于屏蔽对DicoveryClient使用的细节。

实例化CloudEurekaClient

EurekaClient的使用,首先需要对EurekaClient的实例化,实例化通过CloudEurekaClient()构造函数进行,源码如下:

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
            EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
            ApplicationEventPublisher publisher) {
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }

该构造函数声明了当前类型的依赖关系,需要创建:

  • ApplicationInfoManager对象
  • EurekaClientConfig对象,在spring中实际使用的EurekaClientConfigBean对象
  • AbstractDiscoveryOptionalArgs对象,该对象的最终使用的是MutableDiscoveryClientOptionalArgs对象
  • ApplicationEventPublisher对象,该对象由AbstractApplicationContext对象创建,并加入到容器

DiscoveryClient实例化

通过对CloudEurekaClient实例化过程中,会实例化父类DicoveryClient对象,具体源码如下:

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, ResolverUtils::randomize);
    }

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        }, randomizer);
    }
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {

        // 判断args参数是否为空, 在spring cloud环境中,使用的是MutableDiscoveryClientOptionalArgs对象
        if (args != null) {
            // 健康检查处理类
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            // 健康检查回调
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            // event监听事件列表
            this.eventListeners.addAll(args.getEventListeners());
            // 实例注册前置处理器
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        
        // 获取当前实例InstanceInfo信息
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        // 本地region applications缓存
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        // 获取当前需要fetch的regions列表, 通过eureka.client.featch-remote-regions-registry
        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        // 多个fetch regsion采用,分割,在这里将会通过split的方式获取一个数组
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

        // 判断当前EurekaClient是否需要从eureka server 获取注册的实例列表, 通过eureka.client.fetch-registry配置,默认值为true
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        // 判断当前EurekaClient需要将当前实例注册到Eureka Server, 通过eureka.client.register-with-eureka 配置,默认值为true
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        // 该条件用于判断是否需要将当前实例注册到eureka server服务,并且是否需要从eureka server上获取实例列表
        // 当我们在启动Eureka Server的时候, 这两项我们都设置为false
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;

            // 实例regsion checker对象
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            // DiscoveryManager是一个单例,用于保存当前EurekaClient对象,以及与Client相关的Config信息
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            // 当Client 中配置设置为 eureka.client.register-with-eureka 或者 eureka.client.fetch-registry 为true时,就会初始化定时任务
            // scheduler为线程池, 主要为DicoveryClient相关, 并且在线程池中的所有线程都是为守护线程,当主线程stop后,对应的任务前程也将会退出
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            // 初始化DicoveryClient Heartbeat线程池, 线程线程为1
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            // 缓存刷新线程池初始化,用于处理本地实例刷新操作。
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();

            // 该方法主要初始化EurekaHttpClients对象
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            // 判断当前eureka是否通过dns的方式从EurekaServer同步实例列表, 通过eureka.client.use-dns-for-featching-service-urls配置,默认值为false
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            
            // instance regsion checker对象,同步实例的regsion信息
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        // 判断是否需要从eureka server同步实例信息, 如果需要,则从eureka server同步,并保存applications信息到本地
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        // 当前有两个判断条件, 第一个为当前实例是否需要注册实例信息到eureka server
        // 第二个为在初始化过程中是否需要注册, 通过 eureka.client.should-enforce-registration-at-init配置,默认为false
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                // 注册实例信息
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        // 启动定时任务, 包括了刷新缓存,心跳信息等
        initScheduledTasks();

        try {
            // 暴露监控信息
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        // 保存全局EurekaClient对象
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        // 保存全局的eureka client config对象
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

scheduleServerEndpointTask()

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {

        
        // 通过args获取filter列表    
        Collection<?> additionalFilters = args == null
                ? Collections.emptyList()
                : args.additionalFilters;

        // 获取eurekaJerseyClient对象
        EurekaJerseyClient providedJerseyClient = args == null
                ? null
                : args.eurekaJerseyClient;
        
        TransportClientFactories argsTransportClientFactories = null;
        if (args != null && args.getTransportClientFactories() != null) {
            argsTransportClientFactories = args.getTransportClientFactories();
        }
        
        // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
        @SuppressWarnings("rawtypes")
        TransportClientFactories transportClientFactories = argsTransportClientFactories == null
                ? new Jersey1TransportClientFactories()
                : argsTransportClientFactories;
                
        // 是否使用ssl context
        Optional<SSLContext> sslContext = args == null
                ? Optional.empty()
                : args.getSSLContext();
        
        // host name 校验器
        Optional<HostnameVerifier> hostnameVerifier = args == null
                ? Optional.empty()
                : args.getHostnameVerifier();

        // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
        // transport client 工厂对象获取
        eurekaTransport.transportClientFactory = providedJerseyClient == null
                ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
                : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

        // application source 对象创建
        ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
            @Override
            public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
                long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
                long delay = getLastSuccessfulRegistryFetchTimePeriod();
                if (delay > thresholdInMs) {
                    logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                            thresholdInMs, delay);
                    return null;
                } else {
                    return localRegionApps.get();
                }
            }
        };

        // 通过EurekaHttpClients创建bootstrapResolver对象
        eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
                clientConfig,
                transportConfig,
                eurekaTransport.transportClientFactory,
                applicationInfoManager.getInfo(),
                applicationsSource,
                endpointRandomizer
        );

        // 如果需要将当前实例注册到eureka server, 执行该段逻辑
        if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            try {
                // 创建regsitration client factory对象
                newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        transportConfig
                );
                // 获取registration Eureka http client对象
                newRegistrationClient = newRegistrationClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }

            // 将创建的对象保存到eureka transport中
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.registrationClient = newRegistrationClient;
        }

        // new method (resolve from primary servers for read)
        // Configure new transport layer (candidate for injecting in the future)
        // 判断是否需要从eureka 获取注册实例列表
        if (clientConfig.shouldFetchRegistry()) {
            EurekaHttpClientFactory newQueryClientFactory = null;
            EurekaHttpClient newQueryClient = null;
            try {
                // 创建query http client工厂类
                newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        clientConfig,
                        transportConfig,
                        applicationInfoManager.getInfo(),
                        applicationsSource,
                        endpointRandomizer
                );
                // 创建query http client对象
                newQueryClient = newQueryClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
       

            // 将数据保存到transport中
            eurekaTransport.queryClientFactory = newQueryClientFactory;
            eurekaTransport.queryClient = newQueryClient;
        }
    }

这个方法主要是填充EurekaTransport对象,通过EurekaClientFacotry对象创建EurekaHttpClient对象,在EurekaClient中,将注册和创建的客户端分开保存,便于后面使用.

fetchRegistry()

该方法主要从eureka server中fetch全量的注册信息, 具体源码如下:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {

        // 开启计时器
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            // 获取所有的applications, 第一次启动时,该值为空
            Applications applications = getApplications();

            // 该处有几个判断条件:
            // 1. 判断是否禁用了delta,根据注释, 开启将会降低客户端fetch数据的频率,默认值为false. 可以根据eureka.client.disable-delta配置
            // 2. 判断是否根据VIP地址刷新, 默认值为NULL
            // 3. 是否强制执行全量同步, 在应用第一次启动时,为false
            // 4. applications对象是否为null, 在DiscoveryClient初始化的时候,就已经默认包含了Applications对象
            // 5. applications中已经注册的applicaions的数量是否为0, 在初次启动的时候,该值为0, 因此该条件为true
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                
                // 获取并存储全量的注册信息
                getAndStoreFullRegistry();
            } else {
                // 获取并更新delta列表
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        // 发送缓存更新的Event事件,该事件主要是在netflix内部进行处理, 通过EurekaEventListener处理,而该listener的配置,则是放在args中进行配置的
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        // 尝试更新当前instance在EurekaServer上的状态
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

getAndStoreFullRegistry()

该方法主要是从eureka server上同步全量的注册信息, 具体代码如下:

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        // 判断当前EurekaClient是否通过VIP方式获取远程列表, 如果不是,则通过getApplications()方法获取远程注册应用列表
        // 如果采用VIP的方式, 则通过getVIP()方法获取远程应用列表
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        
        // 如果通过http访问返回的结果为200, 则获取applications对象信息
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");

          // 通过cas的方式更新当前fetch次数,如果更新成功,则将获取到的applications存入本地缓存中
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

onCachedRefreshed()

当从Eureka Server中同步完成Applicaions列表之后, 则发送CacheRefreshedEvent事件, 让EurekaEventListener事件能够有机会执行, 具体代码如下:

    protected void onCacheRefreshed() {
        fireEvent(new CacheRefreshedEvent());
    }

    /**
     * Send the given event on the EventBus if one is available
     *
     * @param event the event to send on the eventBus
     */
    protected void fireEvent(final EurekaEvent event) {
        for (EurekaEventListener listener : eventListeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
            }
        }
    }

这里需要注意的是,这里的事件并不是spring cloud内部的事件通知机制,而是由netflix提供的事件机制,因此,这里不能混淆。

updateInstanceRemoteStatus()

更新当前实例的远程实例状态信息, 具体代码如下:

private synchronized void updateInstanceRemoteStatus() {

        // Determine this instance's status for this app and set to UNKNOWN if not found

        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
        if (instanceInfo.getAppName() != null) {
            // 从本地缓存中获取当前appName对应的远程实例信息
            Application app = getApplication(instanceInfo.getAppName());
            if (app != null) {

                // 判断当前实例是否包含了当前启动的instance信息
                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                if (remoteInstanceInfo != null) {
                    // 如果包含, 则将当前当前远程实例状态与远程实例的状态保持一致
                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                }
            }
        }

        // 如果远程实例不存在,则将当前远实例状态重置为UNKOWN
        if (currentRemoteInstanceStatus == null) {
            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
        }

        // Notify if status changed
       // 因为实例在启动过程中,状态为UNKOWN,如果在启动过程中,实例的状态发生了改变, 则需要发送通知,告知状态变更
        if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
            // 内部发送StatusChangeEvent状态通知消息
            onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);

            // 更新当前内部的远程实例状态
            lastRemoteInstanceStatus = currentRemoteInstanceStatus;
        }
    }

register()

该方法作为Client端启动中可选择的一环,实际上是将当前实例注册到Eureka Server中,这是该方法的调用默认为关闭状态, 该方法在实例化中被调用需要满足两个条件:

  • eureka.client.register-with-eureka配置值为true, 该配置默认开启
  • eureka.client.should-enforce-registration-at-init 配置值为true, 该配置默认为false

其实在spring-cloud中,默认是不建议在实例化过程中注册实例,我觉得最主要的原因为在实例化过程中,容器并没有完全准备好,还无法对外提供服务。此时注册上去,可能会导致依赖服务调用失败的情况发生。

具体代码如下:

boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            // 通过EurekaHttpClient对象,将instance信息注册到EurekaServer服务中
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

initScheduledTasks()

在之前的流程中,介绍到了初始化了三个重要的线程池,用于执行定时任务,该方法则主要是用于初始化所有的定时任务,具体执行代码如下:

private void initScheduledTasks() {

        // 当开启fetch之后, 则需要创建定时fetch的定时任务, 通过 eureka.client.fetch.registry 来开启
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            // 该配置主要配置了缓存的更新间隔时间, 通过eureka.client.regsitry-fetch-interval-seconds来配置,默认值为30秒
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

            // 该配置用于设置在fetch操作发生失败后, 最大的重试时间, 该值通过eureka.client.cache-refresh-executor-exponential-back-off-bound配置,默认时间为10秒
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

            // 创建TimedSupervisorTask任务,并提交到线程池调用,默认每隔30秒执行一次
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        // 判断是否开启注册instance, 通过eureka.client.register-with-eureka, 默认值为true
        if (clientConfig.shouldRegisterWithEureka()) {

            // 从instance中获取续期时间间隔, 默认续期时间为 30秒
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

            // 获取当续期失败后,重试的最大时间间隔,默认值为10秒, 可以通过eureka.client.heartbeat-executor-exponential-back-off-bound配置
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            // 开启心跳任务,默认每隔30秒执行一次
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            // 该类主要为创建单个线程,同步当前实例的状态到eureka server服务端
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            // 创建 EventListener对象,用于处理instance状态变换时间信息
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    // 当前listener用于处理instance状态变更的时间通知, 前面主要是做状态的输出, 最主要的还是调用replicator的onDemandUpdate方法
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            // 该配置用于判断是否开启本地状态的更新,通过ApplicationInfoManager触发register/update操作
            // 该配置默认处于开启状态,即默认值为true; 可以通过eureka.client.on-demand-update-status-change配置更改
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                // 当开始配置后,statusChangeListener对象将会被注册到ApplicationInfoManager中, 用于处理StatusChangeEvent 时间
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            // 开启regsiter与update同步任务, 保证eureka 服务端的任务为最新状态
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

InstanceInfoReplicator

在初始化过程中,可以看到,注册instance信息以及更新信息都是通过InstanceInfoReplicator完成的,构造代码如下:

InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {

        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;

        // 创建线程池, 核心线程为1
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        // 保存任务Future
        this.scheduledPeriodicRef = new AtomicReference<Future>();

        // 当前任务状态
        this.started = new AtomicBoolean(false);

        // 限流的实现
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);

        // 同步间隔时间, 默认为30秒
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        // 计算每分钟能够同步多少次, 默认burstSize = 2, 则每分钟能够同步4次
        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

DiscoveryClient同步的过程中, 该任务的任务则是通过start()方法启动,对应源码为:

// 开始方法会传入一个参数, 用于判断延迟多久执行当前任务, 该延迟时间,我任务应当和容器的启动时间保持一致, 避免容器处于不可用状态
// 该值通过eurek.client.initial-instance-info-replication-interval-seconds配置,默认值为40秒
public void start(int initialDelayMs) {
        // 开始方法只会被执行一次, 通过cas方法将当前任务状态设置为started状态
        if (started.compareAndSet(false, true)) {

            // 将当前instance状态设置为dirty状态, 比记录dirty时间值
            instanceInfo.setIsDirty();  // for initial register

            // 调用执行任务, 延迟40秒执行
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);

            // 绑定当前执行任务
            scheduledPeriodicRef.set(next);
        }
    }

通过以上方法可以得知, 当前类型也作为任务本身执行,因此查看run方法, 具体源码如下:

public void run() {
        try {

            // 刷新当前instance info的信息, 该方法主要判断instance info的信息是否发生变化, 如果发生变化, 则instance info将会标记为dirty状态
            discoveryClient.refreshInstanceInfo();

            // 判断当前的instance是否被标记为dirty状态, 如果为dirty, 则返回dirty的时间
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                // 当instance被标记为dirty之后, 则重新将当前的instance注册到eureka server服务中
                discoveryClient.register();

                // 取消instance的dirty状态
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            // 重新执行当前任务,默认每隔30秒钟,执行,并绑定当前执行任务future对象
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

当我们在上面代码分析中得知, 当instance的状态发生变更的时候,会发送一个StatusChangeEvent的事件信息, 当在处理事件时,实际上是调用了onDemanUpdate()方法,具体代码如下:

public boolean onDemandUpdate() {
        // 该主要是为了限流,防止过多的请求
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            // 判断线程池是否处于shutdown状态
            if (!scheduler.isShutdown()) {
                // 提交一个任务执行
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        // 获取当前绑定的任务Future对象
                        Future latestPeriodic = scheduledPeriodicRef.get();

                        // 如果对应的future没有执行完成时,取消当前任务. 注意当前任务不会打断线程执行
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        // 重新执行run方法,开启新的任务
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

EurekaAutoServiceRegistration

在启动过程中比较重要的一个类就是EurekaAutoServiceRegistration类,该类主要维护了InstanceInfo在启动过程中的状态变更, 以及能够及时将状态变更同步到EurekaServer, 类型依赖关系如下:

eureka auto registrationDiscoveryClient启动过程中,伴随着InstanceInfo状态的变化,状态变更流程如下:

instance status

EurekaAutoServiceRegistration完成的就是STARTING -> UP这一步的状态流转, 通过以上类图可以看出, EurekaAutoServiceRegistration 实现了SmartLifecyle类,因此在Spring容器化启动过程中,start()方法将会被回调, 因此我们查看该类的start()方法源码:

@Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        // 在初始化过程中,该端口默认值为0, 如果不为0, 则将非安全端口或者安全端口进行绑定
        if (this.port.get() != 0) {
            if (this.registration.getNonSecurePort() == 0) {
                this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                this.registration.setSecurePort(this.port.get());
            }
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        // 当前registration状态没有处于running状态,并且nonSecurePort端口>0时, 条件才满足
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            // 调用ServiceRegistry register方法,闯入当前的EurekaRegistration对象
            this.serviceRegistry.register(this.registration);

            // 发送InstanceRegisteredEvent时间
            this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                    this.registration.getInstanceConfig()));

            // 将当前的状态标记为running状态
            this.running.set(true);
        }
    }

ServiceRegistry

当前ServiceRegistry类主要负责InstanceInfo的状态变更,以及eureka http client对象的初始化,并注册HealthCheckerHandler对象. 具体源码如下:

public void register(EurekaRegistration reg) {
        // 该方法用于初始化EurekaHttpClient对象
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application "
                    + reg.getApplicationInfoManager().getInfo().getAppName()
                    + " with eureka with status "
                    + reg.getInstanceConfig().getInitialStatus());
        }

        // 更新ApplicationInfoManager中instance状态为初始状态
        // 这个地方主要依赖了CloudEurekaInstanceConfig对象,在初始化状态InitialStatus状态为UP状态
        reg.getApplicationInfoManager()
                .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        // 注册HealthCheckHandler对象,处理与HealthCheck有关的信息
        reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
                .getEurekaClient().registerHealthCheck(healthCheckHandler));
    }

在register方法中,会直接更新ApplicationInfoManager中的InstanceStatus, 更新instance status之后,会触发StatusChangeEvent时间,这里就与DiscoveryClient中初始化的InstanceInfoReplicator相对应。能够及时处理instance 状态变更。

至此,整个Eureka Client实例化流程整体梳理完毕,后面将给出比较完整的流程图.

总结

在Eurek Client初始化过程中,会初始化相关很多相关信息,spring cloud在这基础上做了很多的扩展,包括了服务状态、状态变更、心跳机制、实例同步任务等相关。流程还是比较长,需要比较长的时间进行梳理。以下给出了主要Discovery Client相关的主要流程图,希望可以帮到大家:

eureka client

 

专注着

一个奋斗在编程路上的小伙

文章评论