Eureka Server本身在做服务注册时,客户端是否下线,是通过客户端向服务端发送心跳信息, 服务端一次来保证客户端处于UP状态。同时当客户端心跳发送失败时,这是服务端将通过自身的剔除策略,将处于DOWN
状态下的服务从Registry中进行移除,保证服务可用性。
策略如何启用?
前面章节中,服务的启动过程中详细阐述了EurekaServer的启用过程, 其中有介绍到, 当EurekaServerBootstrap
在执行initilize
方法时,会启动evict定时任务,执行服务剔除操作,具体代码如下:
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
evict任务启动,则是通过postInit方法执行,方法很简单, 主要包含三个步骤:
- 开启
renewLasmin
任务 - 判断当前是否已经存在evict任务,存在则取消
- 开启evict定时任务,每60秒执行一次(
eureka.server.eviction.interval-timer-ms = 60
)
任务做了什么?
以上流程图我觉得是比较清晰的表达了整个evict方法执行的详情过程, 对于Eureka来说,有一下需要注意的点:
- 执行Evict的定时任务为60秒执行一次
- Eureka Server在计算Instance是否过期的时候,并不是采用一刀切的方式,而是通过当前时间与上一次执行的时间算出差值,动态的匹配那些instance过期
- 当过期的
instance数量 > instance总数 * renewsThreshold
的值时,并不会一次性将所有的instance的剔除,而是采用了取最小值的剔除策略.
源码解读
其他的细节我们就不用看了,直接看evict方法是如何将instance做剔除操作的:
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); // 判断当前instance registry是否支持将Lease过期, 如果为false, 则结束执行,数据不会从map中剔除 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判断当前lease是否过期的依据,主要是根据三个值计算: System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs) // additianalLeaseMs则是根据当前时间 - 上一次执行evict的时间,的到的一个差值 // lastUpdateTimestamp主要是值当前Lease客户端心跳的时间 // duration主要为心跳的间隔时间 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; // 这里也是执行evict相关的关键点所在, 可能过期的expiredLease会大于evictionLimit信息,这时并不会将所有Lease全部过期, 而是取最小值 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); // 真正执行evict策略的方法 internalCancel(appName, id, false); } } }
前面也讲过,我们Registry对象是spring自己的实现InstanceRegistry
对象,因此,在这个扩展的过程中,提供了不一样的点,就是EurekaInstanceCanceledEvent
事件的发送:
private void handleCancelation(String appName, String id, boolean isReplication) { log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication); publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication)); }
真正的执行剔除的方法的,还是在AbstractInstanceRegistry
中进行执行, 源码如下:
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { // 获取读锁 read.lock(); CANCEL.increment(isReplication); // 获取appName在registry中注册的Lease信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { // 如果实例存在,则删除当前过期的实例 leaseToCancel = gMap.remove(id); } synchronized (recentCanceledQueue) { // 将伤处的Instance信息存入到queue中 recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); } // 移除当前实例的overridenInstanceStatus信息 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } // 如果当前实例的信息不存在时,直接返回 if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; // 如果实例信息存在,则将当前实例信息标记为删除 if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } // 过期responseCache中的readWriteCacheMap中的实例信息 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } }
至此,关于Eureka Server中关键的流程信息已经介绍完毕,如果大家对文章有好的意见或者疑问,都可以在评论区留言。
文章评论