Eureka Server本身在做服务注册时,客户端是否下线,是通过客户端向服务端发送心跳信息, 服务端一次来保证客户端处于UP状态。同时当客户端心跳发送失败时,这是服务端将通过自身的剔除策略,将处于DOWN状态下的服务从Registry中进行移除,保证服务可用性。
策略如何启用?
前面章节中,服务的启动过程中详细阐述了EurekaServer的启用过程, 其中有介绍到, 当EurekaServerBootstrap在执行initilize方法时,会启动evict定时任务,执行服务剔除操作,具体代码如下:
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
evict任务启动,则是通过postInit方法执行,方法很简单, 主要包含三个步骤:
- 开启
renewLasmin任务 - 判断当前是否已经存在evict任务,存在则取消
- 开启evict定时任务,每60秒执行一次(
eureka.server.eviction.interval-timer-ms = 60)
任务做了什么?
以上流程图我觉得是比较清晰的表达了整个evict方法执行的详情过程, 对于Eureka来说,有一下需要注意的点:
- 执行Evict的定时任务为60秒执行一次
- Eureka Server在计算Instance是否过期的时候,并不是采用一刀切的方式,而是通过当前时间与上一次执行的时间算出差值,动态的匹配那些instance过期
- 当过期的
instance数量 > instance总数 * renewsThreshold的值时,并不会一次性将所有的instance的剔除,而是采用了取最小值的剔除策略.
源码解读
其他的细节我们就不用看了,直接看evict方法是如何将instance做剔除操作的:
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 判断当前instance registry是否支持将Lease过期, 如果为false, 则结束执行,数据不会从map中剔除
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判断当前lease是否过期的依据,主要是根据三个值计算: System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)
// additianalLeaseMs则是根据当前时间 - 上一次执行evict的时间,的到的一个差值
// lastUpdateTimestamp主要是值当前Lease客户端心跳的时间
// duration主要为心跳的间隔时间
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 这里也是执行evict相关的关键点所在, 可能过期的expiredLease会大于evictionLimit信息,这时并不会将所有Lease全部过期, 而是取最小值
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 真正执行evict策略的方法
internalCancel(appName, id, false);
}
}
}
前面也讲过,我们Registry对象是spring自己的实现InstanceRegistry对象,因此,在这个扩展的过程中,提供了不一样的点,就是EurekaInstanceCanceledEvent事件的发送:
private void handleCancelation(String appName, String id, boolean isReplication) {
log("cancel " + appName + ", serverId " + id + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}
真正的执行剔除的方法的,还是在AbstractInstanceRegistry中进行执行, 源码如下:
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try { // 获取读锁
read.lock();
CANCEL.increment(isReplication);
// 获取appName在registry中注册的Lease信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 如果实例存在,则删除当前过期的实例
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
// 将伤处的Instance信息存入到queue中
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
// 移除当前实例的overridenInstanceStatus信息
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
// 如果当前实例的信息不存在时,直接返回
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
// 如果实例信息存在,则将当前实例信息标记为删除
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 过期responseCache中的readWriteCacheMap中的实例信息
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
至此,关于Eureka Server中关键的流程信息已经介绍完毕,如果大家对文章有好的意见或者疑问,都可以在评论区留言。
