在Eureka Server自动装配文章中,介绍了Eureka相关的启动组件,这篇文章主要介绍在启动过程中,各组件都是如何工作的。
InstanceRegistry
从类型命名可以知道,该类型主要用于Instance信息注册实现,用于保存Eureka Client注册上来的基本信息。我们可以查看下InstanceRegistry
的实现结构.
InstanceRegistry
中实现了PeerAwareInstanceRegistryImpl
的实现,创建实例的源码如下:
public InstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient, int expectedNumberOfClientsSendingRenews, int defaultOpenForTrafficCount) { super(serverConfig, clientConfig, serverCodecs, eurekaClient); // eureka.server.expectedNumberOfRenewsPerMin 配置值, 默认为1 this.expectedNumberOfClientsSendingRenews = expectedNumberOfClientsSendingRenews; // eureka.server.defaultOpenForTrafficCount 默认值为1 this.defaultOpenForTrafficCount = defaultOpenForTrafficCount; }
创建InstanceRegsitry
实例时,同事也会初始化PeerAwareInstanceRegistryImpl
对象, 具体代码如下:
@Inject public PeerAwareInstanceRegistryImpl( EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient ) { super(serverConfig, clientConfig, serverCodecs); // eureka client独享 this.eurekaClient = eurekaClient; // 同步策略 this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1); // We first check if the instance is STARTING or DOWN, then we check explicit overrides, // then we check the status of a potentially existing lease. this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(), new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule()); }
对于InstanceRegistry
而言,没有其他复杂的操作,只是将对象创建出来,并交由Spring进行托管。
RefreshableEureakPeerNodes
PeerNodes主要用于维护客户端连接Eureka Server连接url列表, 该类有一个start()
方法,用于开启定时任务定时刷新, 具体代码如下:
public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 更新PeerEurekaNodes节点信息 updatePeerEurekaNodes(resolvePeerUrls()); // 创建update PeerEurekaNodes任务 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // task任务通过定时任务执行, 可以通过 eureka.server.peer-eureka-nodes-update-interval-ms 配置, 默认为10分钟 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
我们可以看一下,resolvePeerUrls()
方法返回了EurekaServerUrl服务列表, 具体代码如下:
protected List<String> resolvePeerUrls() { InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 从当前ClientConfig中获取链接的server-url列表, 该列表会有region分区的限制, 如果分区数据不存在,则采用defautZone配置列表 List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { // 判断是否和当前服务器url地址一样,如果一样,则从service-url列表中剔除 if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; }
updatePeerEurekaNodes
代码其实很简单,通过不断从ServerConfig中读取server-url配置,然后和本地缓存做比对,并更新本地缓存信息。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 将peerEurekaNodeUrls中的配置信息做缓存,外部变化不会影响到代码执行 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } // 更新本地缓存 this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
EurekaServerContext
该类是作为EurekaServer实现中,比较重要的一环,用于对EurekaServer上下文内容执行初始化操作. 具体代码如下:
@Singleton public class DefaultEurekaServerContext implements EurekaServerContext { private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class); private final EurekaServerConfig serverConfig; private final ServerCodecs serverCodecs; private final PeerAwareInstanceRegistry registry; private final PeerEurekaNodes peerEurekaNodes; private final ApplicationInfoManager applicationInfoManager; @PostConstruct @Override public void initialize() { logger.info("Initializing ..."); // 调用PeerEurekaNodes方法,并开启定时任务更新PeerEurekaNodes节点信息 peerEurekaNodes.start(); try { // InstanceRegistry初始化数据操作 registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); } @PreDestroy @Override public void shutdown() { logger.info("Shutting down ..."); registry.shutdown(); peerEurekaNodes.shutdown(); logger.info("Shut down"); } ... }
EurekaServerContext
类型是在Bean的初始化阶段执行了initialize()
方法, 该方法主要做两个事情
- 开启定时任务,每10分钟更新server-url配置的节点信息
- 初始化
InstanceRegistry
中的实例数据
InstanceRegistry.init()
初始化方法源码如下:
@Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { // 开启定时任务, 定时清理上1分钟同步的buckets数量, 任务每1分钟执行一次 this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 初始化ResponseCache对象, 也是Eureka Server三级缓存的实现代码 initializedResponseCache(); // 开启定时任务, 用于更新续期阈值, 该值主要用于,在发生分区的时候, 是否丢弃 scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } } private void scheduleRenewalThresholdUpdateTask() { // 根据eureka.server.renewal-threshold-update-interval-ms设置,默认值为15分钟 timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); } private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); // 当前接收到的实例数量 int count = 0; for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold or if self preservation is disabled. // 该处更新阈值信息时, // 1. 接收到的实例数量 > 百分比的阈值 * 当前期望阈值数量 // 2. 禁用selfPrervationModeEnabled,即 eureka.server.should-enable-self-preservation= false, 默认值true // 默认eureka.renewal-percent-threshold=0.85 // 如果我们将自我保护打开,在发生网络分区的时候,当instance的数量达不到历史期望值的85%的时候, 将不会在更新阈值, eureka-server将启用自我保护状态 if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
EurekaServerBootstrap
该类主要负责对EurekaServer环境配置, 以及远程instance信息从同级eureka server同步,保证所有Eureka Server中的数据一致性。在该类中主要包含两个主要方法contextInitialized
与contextDestroyed
两个方法,具体源码如下:
public void contextInitialized(ServletContext context) { try { // 初始化eureka环境信息, 确认数据中心地址, 以及当前eureka server所处的环境, 默认为test环境 initEurekaEnvironment(); // 初始化eureka server context initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } public void contextDestroyed(ServletContext context) { try { log.info("Shutting down Eureka Server.."); context.removeAttribute(EurekaServerContext.class.getName()); destroyEurekaServerContext(); destroyEurekaEnvironment(); } catch (Throwable e) { log.error("Error shutting down eureka", e); } log.info("Eureka Service is now shutdown..."); }
在初始化initEurekaServerContext()
方法中, 则是主要同步instance数据和开启evict定时任务,用于剔除已下线的任务:
protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = this.registry.syncUp(); // 开启evict定时任务 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
在InstanceRegistry
中,包含了syncUp()
方法,源码如下:
@Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 通过eureka client 获取其他eureka server节点上instance列表 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { // 将instance信息保存到当前Regitry中 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 当前同步的instance数量 count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
openForTraffic()
方法,则是开启定时任务, 具体代码如下:
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; // 更新renews阈值 updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; // 这个aws服务器,可以忽略 if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 将当前instance的状态设置为UP状态 applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 后置初始化操作 super.postInit(); } protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); // 每60秒定制执行evict操作, 可以通过eureka.server.eviction-interval-time-in-ms=60来更改当前的时间 evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
以上则是eureka主要的启动过程,这之间设计到阈值定时任务,evict定时任务, PeerNodes更新任务等,下一章节将主要介绍eureka的服务注册与Evict策略
文章评论