spring cloud 服务发现之Eureka Client(四)—定时任务

10 9 月, 2021 127点热度 0人点赞 0条评论

在之前章节spring cloud 服务发现之Eureka Client(三)—DiscoveryClient 中介绍了Eureka 客户端在启动过程中的流程,在DiscoveryClient启动过程中,会有两个重要的定时任务:

  • 发送心跳信息
  • EurekaServer同步注册实例节点

这篇文章就主要介绍这两个定时任务的执行逻辑以及从源码角度分析代码实现,能够更好的理解和使用Eureka。

心跳任务

之前章节中介绍了在DiscoveryClient实例化过程中,心跳任务TimedSupervisorTask开始执行,执行频率为每30秒执行一次. 下面看下该类的源码信息:

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
     
       // 任务执行超时时间
        this.timeoutMillis = timeUnit.toMillis(timeout);
        // 具体执行task
        this.task = task;
        // 延迟超时
        this.delay = new AtomicLong(timeoutMillis);
        // 最大延迟时间
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

任务的初始化时比较简单的,主要涉及到的为超时时间,以及具体执行的任务。TimedSupervisorTask类型为TimerTask的实现类,最终执行是需要执行run()方法来具体处理业务逻辑。

@Override
    public void run() {
        Future<?> future = null;
        try {
            // 执行任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());

            // 等待任务执行完成
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout

            // 设置延迟时间
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());

            // 请求成功计数
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            // 当发生任务失败时,会延迟任务执行时间, 时间为当前时间 * 2
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            // 取消当前任务
            if (future != null) {
                future.cancel(true);
            }

            // 重新开始任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

通过代码可知,其实TimedSupervisorTask类并不涉及到真正的业务逻辑, 只是一个负责处理异常以及任务重试机制的实现,最终的执行任务还是通过task来实现的。因此,我们查看源码可知,心跳的任务的而实现,主要是通过HeartbeatThread任务来实现的,因此查看HeartbeatThread源码如下:

private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

这个类是一个内部类,主要逻辑就只是调用了renew()方法,查看renew()源码如下:

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            // 调用sendHeartBeat的方法,向eureka server同步服务状态
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            
            // 当发送心跳状态出现异常时, 执行if逻辑
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();

                // 重新注册实例信息到eureka
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

renew()方法中主要做了两件事情:

  • 发送心跳信息到eureka server
  • 当心跳信息发送失败的时候,重新注册当前实例信息到eureka server

以上就是心跳机制的定时任务的而实现,整体来看其实是比较简单的。

同步实例

在Eureka 的整个通信过程中,都是通过http的方式与Eureka Server进行通信,Eureka为了保证能够获取到Eureka Server最新的实例列表,会不定时的从eureka server同步已注册的实例节点。开始定时任务的代码如下:

scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);

上面我们分析了TimedSupervisorTask的实现,这里就不再重复,在任务task的实现主要是通过CacheRefreshThread类执行,下面看下该类的实现源码:

class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

直接查看refreshRegistry()方法源码:

void refreshRegistry() {
        try {

            // 是否同步region注册信息
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            // 获取客户端配置拉取region列表
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                 // 缓存的region信息
                String currentRemoteRegions = remoteRegionsToFetch.get();
                // 判断region配置是否发生变化,这里主要适用于配置能够动态更新情况
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        // 更新拉取的regions信息
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }

            // 从eureka server获取instance信息
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                // 同步实例数量
                registrySize = localRegionApps.get().size();
                // 更新拉取成功时间
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

            // 记录日志
            if (logger.isDebugEnabled()) {
                .......
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }

在获取注册实例时,调用了fetchRegisty()方法,该方法源码如下:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            // 判断条件
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                // 获取并更新注册实例信息
                getAndStoreFullRegistry();
            } else {
                // 获取并更新增量信息
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // 发送CachedRefreshEvent时间
        onCacheRefreshed();

        // 从远程拉取的下来的instance状态,更新当前实例状态
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

最终这个方法的调用,我们在spring cloud 服务发现之Eureka Client(三)—DiscoveryClient 已经介绍过了,这里就不做过多的介绍。

以上就是在EurekaClient中两个比较重要的定时任务。希望可以帮助到大家。如果文章有帮助到您,请为文章点赞。

 

 

专注着

一个奋斗在编程路上的小伙

文章评论