在Eureka Server自动装配文章中,介绍了Eureka相关的启动组件,这篇文章主要介绍在启动过程中,各组件都是如何工作的。
InstanceRegistry
从类型命名可以知道,该类型主要用于Instance信息注册实现,用于保存Eureka Client注册上来的基本信息。我们可以查看下InstanceRegistry的实现结构.
InstanceRegistry中实现了PeerAwareInstanceRegistryImpl的实现,创建实例的源码如下:
public InstanceRegistry(EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig, ServerCodecs serverCodecs,
EurekaClient eurekaClient, int expectedNumberOfClientsSendingRenews,
int defaultOpenForTrafficCount) {
super(serverConfig, clientConfig, serverCodecs, eurekaClient);
// eureka.server.expectedNumberOfRenewsPerMin 配置值, 默认为1
this.expectedNumberOfClientsSendingRenews = expectedNumberOfClientsSendingRenews;
// eureka.server.defaultOpenForTrafficCount 默认值为1
this.defaultOpenForTrafficCount = defaultOpenForTrafficCount;
}
创建InstanceRegsitry实例时,同事也会初始化PeerAwareInstanceRegistryImpl对象, 具体代码如下:
@Inject
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
// eureka client独享
this.eurekaClient = eurekaClient;
// 同步策略
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
对于InstanceRegistry而言,没有其他复杂的操作,只是将对象创建出来,并交由Spring进行托管。
RefreshableEureakPeerNodes
PeerNodes主要用于维护客户端连接Eureka Server连接url列表, 该类有一个start()方法,用于开启定时任务定时刷新, 具体代码如下:
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 更新PeerEurekaNodes节点信息
updatePeerEurekaNodes(resolvePeerUrls());
// 创建update PeerEurekaNodes任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// task任务通过定时任务执行, 可以通过 eureka.server.peer-eureka-nodes-update-interval-ms 配置, 默认为10分钟
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
我们可以看一下,resolvePeerUrls()方法返回了EurekaServerUrl服务列表, 具体代码如下:
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 从当前ClientConfig中获取链接的server-url列表, 该列表会有region分区的限制, 如果分区数据不存在,则采用defautZone配置列表
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
// 判断是否和当前服务器url地址一样,如果一样,则从service-url列表中剔除
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
updatePeerEurekaNodes代码其实很简单,通过不断从ServerConfig中读取server-url配置,然后和本地缓存做比对,并更新本地缓存信息。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 将peerEurekaNodeUrls中的配置信息做缓存,外部变化不会影响到代码执行
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
// 更新本地缓存
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
EurekaServerContext
该类是作为EurekaServer实现中,比较重要的一环,用于对EurekaServer上下文内容执行初始化操作. 具体代码如下:
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
private final EurekaServerConfig serverConfig;
private final ServerCodecs serverCodecs;
private final PeerAwareInstanceRegistry registry;
private final PeerEurekaNodes peerEurekaNodes;
private final ApplicationInfoManager applicationInfoManager;
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
// 调用PeerEurekaNodes方法,并开启定时任务更新PeerEurekaNodes节点信息
peerEurekaNodes.start();
try {
// InstanceRegistry初始化数据操作
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
@PreDestroy
@Override
public void shutdown() {
logger.info("Shutting down ...");
registry.shutdown();
peerEurekaNodes.shutdown();
logger.info("Shut down");
}
...
}
EurekaServerContext类型是在Bean的初始化阶段执行了initialize()方法, 该方法主要做两个事情
- 开启定时任务,每10分钟更新server-url配置的节点信息
- 初始化
InstanceRegistry中的实例数据
InstanceRegistry.init()初始化方法源码如下:
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 开启定时任务, 定时清理上1分钟同步的buckets数量, 任务每1分钟执行一次
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 初始化ResponseCache对象, 也是Eureka Server三级缓存的实现代码
initializedResponseCache();
// 开启定时任务, 用于更新续期阈值, 该值主要用于,在发生分区的时候, 是否丢弃
scheduleRenewalThresholdUpdateTask();
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
private void scheduleRenewalThresholdUpdateTask() {
// 根据eureka.server.renewal-threshold-update-interval-ms设置,默认值为15分钟
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
// 当前接收到的实例数量
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
// 该处更新阈值信息时,
// 1. 接收到的实例数量 > 百分比的阈值 * 当前期望阈值数量
// 2. 禁用selfPrervationModeEnabled,即 eureka.server.should-enable-self-preservation= false, 默认值true
// 默认eureka.renewal-percent-threshold=0.85
// 如果我们将自我保护打开,在发生网络分区的时候,当instance的数量达不到历史期望值的85%的时候, 将不会在更新阈值, eureka-server将启用自我保护状态
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
EurekaServerBootstrap
该类主要负责对EurekaServer环境配置, 以及远程instance信息从同级eureka server同步,保证所有Eureka Server中的数据一致性。在该类中主要包含两个主要方法contextInitialized与contextDestroyed两个方法,具体源码如下:
public void contextInitialized(ServletContext context) {
try {
// 初始化eureka环境信息, 确认数据中心地址, 以及当前eureka server所处的环境, 默认为test环境
initEurekaEnvironment();
// 初始化eureka server context
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
public void contextDestroyed(ServletContext context) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
}
catch (Throwable e) {
log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
在初始化initEurekaServerContext()方法中, 则是主要同步instance数据和开启evict定时任务,用于剔除已下线的任务:
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
// 开启evict定时任务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
在InstanceRegistry中,包含了syncUp()方法,源码如下:
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 通过eureka client 获取其他eureka server节点上instance列表
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 将instance信息保存到当前Regitry中
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
// 当前同步的instance数量
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
openForTraffic()方法,则是开启定时任务, 具体代码如下:
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
// 更新renews阈值
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
// 这个aws服务器,可以忽略
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 将当前instance的状态设置为UP状态
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 后置初始化操作
super.postInit();
}
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// 每60秒定制执行evict操作, 可以通过eureka.server.eviction-interval-time-in-ms=60来更改当前的时间
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
以上则是eureka主要的启动过程,这之间设计到阈值定时任务,evict定时任务, PeerNodes更新任务等,下一章节将主要介绍eureka的服务注册与Evict策略
