在前面的文章中,介绍了Eureka Server的启动流程,以及启动过程中执行的操作信息。本章将会介绍服务注册相关的实现。服务注册中主要介绍Instance信息通过register的方式保存到Registry的业务逻辑,以及如何实现数据同步。
数据初始化
在spring cloud 服务注册之Eureka Server(二) – 启动过程章节中,介绍到Eureka Server启动过程中, 会从其他的Peer Nodes上同步Applications列表,并保存到当前的服务Registry服务中, 具体代码如下:
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 通过Eureka Client通过applications列表
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 将Instance信息存放到当前Registry中
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
将Instance信息保存到当前Registry实例中,是通过register方法保存instance信息, 具体实现代码如下:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 获取读锁
read.lock();
// 根据注册的instance 中appName获取存储的InstanceInfo列表
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 计数
REGISTER.increment(isReplication);
// 当gMap为空时,表示了当前appName没有在Registry中注册, 因此构建一个空的Map放入到registry中
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 根据instanceId获取gMap中存储的实例列表
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 该条件判断了是否存在了instance信息, 如果存在了,则条件成立
if (existingLease != null && (existingLease.getHolder() != null)) {
// 获取最后一次instance被请求的时间戳
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
// 当前注册instance最后一次被请求的时间戳
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 当前判断逻辑为, 如果本地的instance的时间戳大于远程的instance的时间戳,则还是以本地的实例信息为准.说明本地的instance信息是较新的
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 当前逻辑为: 在本地registry中不存在instance信息, 获取排它锁
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// 更新本地的renews阈值
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 通过Lease保存当前注册的instance信息
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将实例信息保存在gMap中, key -> instance.getId, value -> Lease
gMap.put(registrant.getId(), lease);
// 当前recentRegisteredQueue, 根据注释,表示当前的queue只是用调试, 具体没有发现其他用处
// // CircularQueues here for debugging/statistics purposes only
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 当前if的逻辑主要是, 判断instance的当前状态是否为UNKOWN, 如果不是UNKOWN, 并且在overriddenInstanceStatusMap不
// 包含instanceId的状态数据, 则将当前instance的状态加入到overriddenInstanceStatusMap中
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 当前将根据status rules确定当前实例的状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 当前instance的状态称为UP的时候,将Lease的状态更改为UP状态
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 将当前instance的操作类型设置为ADDED
registrant.setActionType(ActionType.ADDED);
// 最近变更Lease列表
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 实例最后更新的时间
registrant.setLastUpdatedTimestamp();
// 将responseCase中readWriteCacheMap中对应的appName的值设置为过期
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 释放读锁
read.unlock();
}
}
通过以上源码分析,可以看到, 在registry中register实例信息的时候,流程图如下:
数据复制
当有客户端注册instance上来时,如何让其他的eureka server能够获取到当前的注册实例信息呢,源码如下:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
// 将当前实例信息注册到其他Peer Nodes节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
通过查看源码可知,在注册完成instance实例信息之后,通过调用replicateToPeers方法将当前实例信息同步到其他的节点, 方法代码如下:
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 将instance信息注册到其他的PeerNodes节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: // 取消
node.cancel(appName, id);
break;
case Heartbeat: // 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 注册
node.register(info);
break;
case StatusUpdate: // 状态更新
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: // 删除状态
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
通过源码可以得知,通过instance信息,实际上是调用PeerNode的register方法,实现同步的方式,PeerNode的源码如下:
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
在注册的时候,通过replicationClient对象,调用register方法注册实例信息, 具体代码如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
总结
通过以上分析可以得知,Eureka Server 之间数据同步主要有两种方式:
- 当Eureka Server启动时,主动从其他的Peer Nodes上同步实例列表
- 当Eureka Server接收到客户端服务注册时,主动调用Peer Node将服务注册到其他服务
