在前面的文章中,介绍了Eureka Server的启动流程,以及启动过程中执行的操作信息。本章将会介绍服务注册相关的实现。服务注册中主要介绍Instance信息通过register的方式保存到Registry的业务逻辑,以及如何实现数据同步。
数据初始化
在spring cloud 服务注册之Eureka Server(二) – 启动过程章节中,介绍到Eureka Server启动过程中, 会从其他的Peer Nodes上同步Applications
列表,并保存到当前的服务Registry服务中, 具体代码如下:
@Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 通过Eureka Client通过applications列表 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { // 将Instance信息存放到当前Registry中 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
将Instance信息保存到当前Registry实例中,是通过register
方法保存instance信息, 具体实现代码如下:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 获取读锁 read.lock(); // 根据注册的instance 中appName获取存储的InstanceInfo列表 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); // 计数 REGISTER.increment(isReplication); // 当gMap为空时,表示了当前appName没有在Registry中注册, 因此构建一个空的Map放入到registry中 if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 根据instanceId获取gMap中存储的实例列表 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease // 该条件判断了是否存在了instance信息, 如果存在了,则条件成立 if (existingLease != null && (existingLease.getHolder() != null)) { // 获取最后一次instance被请求的时间戳 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // 当前注册instance最后一次被请求的时间戳 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. // 当前判断逻辑为, 如果本地的instance的时间戳大于远程的instance的时间戳,则还是以本地的实例信息为准.说明本地的instance信息是较新的 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // 当前逻辑为: 在本地registry中不存在instance信息, 获取排它锁 // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; // 更新本地的renews阈值 updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } // 通过Lease保存当前注册的instance信息 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 将实例信息保存在gMap中, key -> instance.getId, value -> Lease gMap.put(registrant.getId(), lease); // 当前recentRegisteredQueue, 根据注释,表示当前的queue只是用调试, 具体没有发现其他用处 // // CircularQueues here for debugging/statistics purposes only synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens // 当前if的逻辑主要是, 判断instance的当前状态是否为UNKOWN, 如果不是UNKOWN, 并且在overriddenInstanceStatusMap不 // 包含instanceId的状态数据, 则将当前instance的状态加入到overriddenInstanceStatusMap中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules // 当前将根据status rules确定当前实例的状态 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp // 当前instance的状态称为UP的时候,将Lease的状态更改为UP状态 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } // 将当前instance的操作类型设置为ADDED registrant.setActionType(ActionType.ADDED); // 最近变更Lease列表 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); // 实例最后更新的时间 registrant.setLastUpdatedTimestamp(); // 将responseCase中readWriteCacheMap中对应的appName的值设置为过期 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { // 释放读锁 read.unlock(); } }
通过以上源码分析,可以看到, 在registry中register
实例信息的时候,流程图如下:
数据复制
当有客户端注册instance上来时,如何让其他的eureka server能够获取到当前的注册实例信息呢,源码如下:
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); // 将当前实例信息注册到其他Peer Nodes节点 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
通过查看源码可知,在注册完成instance实例信息之后,通过调用replicateToPeers
方法将当前实例信息同步到其他的节点, 方法代码如下:
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 将instance信息注册到其他的PeerNodes节点 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 取消 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: // 状态更新 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: // 删除状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
通过源码可以得知,通过instance信息,实际上是调用PeerNode的register
方法,实现同步的方式,PeerNode的源码如下:
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
在注册的时候,通过replicationClient
对象,调用register
方法注册实例信息, 具体代码如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
总结
通过以上分析可以得知,Eureka Server 之间数据同步主要有两种方式:
- 当Eureka Server启动时,主动从其他的Peer Nodes上同步实例列表
- 当Eureka Server接收到客户端服务注册时,主动调用Peer Node将服务注册到其他服务
文章评论