阅读代码深入原理9——Spring Cloud Netflix之Eureka Server
lipiwang 2024-11-15 22:02 14 浏览 0 评论
spring-cloud-netflix-eureka-server依赖spring-cloud-netflix-eureka-client,关于eureka-client相关的见我的相关文章。
一般我们使用Eureka作为注册中心时使用方法如下:
@SpringBootApplication
@EnableEurekaServer // 注册bean(EurekaServerMarkerConfiguration.Marker)作为一个标识,来启用EurekaServerAutoConfiguration
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
spring.factories机制自动引入EurekaServerAutoConfiguration,它通过Import形式引入了EurekaServerInitializerConfiguration,还定义了EurekaController、InstanceRegistry、EurekaServerBootstrap、RefreshablePeerEurekaNodes、DefaultEurekaServerContext、FilterRegistrationBean、Application(jersey-server的DefaultResourceConfig)等。
根据Spring的SmartLifecycle机制,EurekaServerInitializerConfiguration初始化后调用其start方法,代码如下:
# spring-cloud-netflix-eureka-server-2.2.8.RELEASE.jar!/org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
// 1. 从对等节点同步注册信息,见下文
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); // 2. 发布EurekaRegistryAvailableEvent
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig())); // 3. 发布EurekaServerStartedEvent
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
# spring-cloud-netflix-eureka-server-2.2.8.RELEASE.jar!/org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment(); // 1. 检查archaius配置eureka的datacenter、environment,如果不存在则设置默认值(数据中心为default,环境为test)
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start(); // 2. datacenterInfo.name为Aws时(默认datacenterInfo.name为MyOwn)会从本地链接地址(http://169.254.169.254/latest/meta-data)获取元信息,通过元信息的instanceId和availabilityZone,再使用DNS或配置绑定eureka-server到EIP(域名部分截取出EIP地址,弹性IP为固定的IP地址,公网可访问);
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp(); // 3. // 调用InstanceRegistry.syncUp,此时从对等节点获取所有Application,注册InstanceInfo到此节点
this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 4. // 调用InstanceRegistry.openForTraffic,此时将实例状态设为可用,然后启动定时任务(eureka.server.deltaRetentionTimerIntervalInMs,默认5分钟)更新MeasuredRate.currentBucket为0,再启动定期清理任务(eureka.server.evictionIntervalTimerInMs,默认1分钟)来清理租约过期的实例;
// Register all monitoring statistics.
EurekaMonitors.registerAllStats(); // 5. 注册监控相关信息,未配置监控实现时使用JMX
}
接着看同步其它节点注册信息并注册到自身实例过程,EurekaServerAutoConfiguration定义的PeerAwareInstanceRegistry类型为spring的InstanceRegistry,它是netflix的PeerAwareInstanceRegistryImpl的子类。
在InstanceRegistry初始化前,强制eurekaClient获取对等节点的注册信息。
# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) { // 1. (AWS数据中心时)eureka-server实例的可用区包含服务实例的可用区则认为可注册。实际代码始终需注册!
register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 2. 注册,见下文
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
AbstractInstanceRegistry.register方法为注册构造的registry数据结构为{appName:{instanceId: {holder:{}, registrationTimestamp:, lastUpdateTimestamp:, duration:}}}。
ResponseCacheImpl持有registry,构建二级缓存readWriteCacheMap,此缓存在获取key时,懒加载形式从registry获取信息。同时ResponseCacheImpl还有一级缓存readOnlyCacheMap,通过定时任务,从readWriteCacheMap更新信息。
# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.AbstractInstanceRegistry
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) { // 1. 应用信息不存在时创建实例Map
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) { // 2. 实例存在且有重启时,更新注册的实例信息
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else { // 3. 新实例注册的则更新续期计算的数据
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); // 4. 创建实例租约信息,过期时间为当前时间大于注册/续期时间与租期之和
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>( // 5. 更新最新注册队列,供dashboard展示“/lastn”用
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); // 6. UP和OUT_OF_SERVICE状态可能仅是心跳失败
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); // 7. 将responseCache缓存失效,导致ResponseCacheImpl的二级缓存readWriteCacheMap失效,定时任务更新一级缓存readOnlyCacheMap
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
如果是单节点,注册信息来源于eureka-client的请求。此时,回到EurekaServerAutoConfiguration定义的jerseyApplication、FilterRegistrationBean。
jerseyApplication会使用ClassPathScanningCandidateComponentProvider扫描"com.netflix.discovery"、"com.netflix.eureka"两个包下有Path/Provider注解的类,作为DefaultResourceConfig的类资源。
FilterRegistrationBean拦截匹配形如"/eureka/*“的URL请求,ServletContainer作为包装jerseyApplication的Filter,在Filter初始化时,将com.netflix.discovery包和com.netflix.eureka包下有@Provider或@Path注解的类以path和类映射构成UriRule。
URL和处理类关系参考如下:
# org.springframework.cloud.netflix.eureka.http.RestTemplateEurekaHttpClient
# 注册
POST http://localhost:8761/eureka/apps/{appName}
com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
com.netflix.eureka.resources.ApplicationResource#addInstance
# 心跳
PUT http://localhost:8761/eureka/apps/{appName}/{instanceId}?status=&lastDirtyTimestamp=&overriddenstatus=
com.netflix.eureka.resources.InstanceResource#renewLease
# 下线
DELETE http://localhost:8761/eureka/apps/{appName}/{instanceId}
com.netflix.eureka.resources.InstanceResource#cancelLease
#####
# 获取所有应用信息
GET http://localhost:8761/eureka/apps/?regions=
com.netflix.eureka.resources.ApplicationsResource#getContainers
# 获取有变化的应用
GET http://localhost:8761/eureka/apps/delta?regions=
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
# 获取应用信息
GET http://localhost:8761/eureka/apps/{appName}/{instanceId}
com.netflix.eureka.resources.ApplicationResource#getApplication
# 获取实例信息
GET http://localhost:8761/eureka/instances/{instanceId}
com.netflix.eureka.resources.InstancesResource#getById
DefaultEurekaServerContext初始化时先调用PeerEurekaNodes.start,然后调用PeerAwareInstanceRegistryImpl.init,代码如下:
# eureka-core-1.10.11.jar!/com.netflix.eureka.cluster.PeerEurekaNodes
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 1. 从archaius或配置文件先获取当前eureka-client的region(默认region为us-east-1),再获取region下的availabilityZones(默认zone为default);
// 如果数据中心是AWS,则从元信息获取可用区,否则返回配置的availabilityZones下指定region的第一个可用区!?
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils // 2. DNS方式获取(默认不使用)或从配置中获取当前可用区(或默认可用区)的服务节点
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) { // 2. 节点为eureka.server.myUrl,或者节点地址包含eureka.instance.hostname
replicaUrls.remove(idx); // 3. 移除自身节点
} else {
idx++;
}
}
return replicaUrls;
}
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 4. 关闭之前的节点,创建新节点(启动定时任务)
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay( // 5. 定时更新对等节点信息
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
initializedResponseCache(); // 1. 初始化缓存
scheduleRenewalThresholdUpdateTask(); // 2. 定时任务更新numberOfRenewsPerMinThreshold
initRemoteRegionRegistry(); // 3. 如果eureka.server.remoteRegionUrlsWithName不为空(默认为空),则创建RemoteRegionRegistry,在构造函数内创建任务来获取远程区域注册中心的应用信息
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
当请求“/eureka/apps/{appName}”到达时,触发com.netflix.eureka.resources.ApplicationResource#addInstance,导致com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register,进而导致信息同步给其它eureka-server实例,代码如下:
# eureka-core-1.10.11.jar!/com.netflix.eureka.cluster.PeerEurekaNodes
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 1. 从archaius或配置文件先获取当前eureka-client的region(默认region为us-east-1),再获取region下的availabilityZones(默认zone为default);
// 如果数据中心是AWS,则从元信息获取可用区,否则返回配置的availabilityZones下指定region的第一个可用区!?
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils // 2. DNS方式获取(默认不使用)或从配置中获取当前可用区(或默认可用区)的服务节点
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) { // 2. 节点为eureka.server.myUrl,或者节点地址包含eureka.instance.hostname
replicaUrls.remove(idx); // 3. 移除自身节点
} else {
idx++;
}
}
return replicaUrls;
}
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 4. 关闭之前的节点,创建新节点(启动定时任务)
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay( // 5. 定时更新对等节点信息
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
```
```java
# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
initializedResponseCache(); // 1. 初始化缓存
scheduleRenewalThresholdUpdateTask(); // 2. 定时任务更新numberOfRenewsPerMinThreshold
initRemoteRegionRegistry(); // 3. 如果eureka.server.remoteRegionUrlsWithName不为空(默认为空),则创建RemoteRegionRegistry,在构造函数内创建任务来获取远程区域注册中心的应用信息
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
相关推荐
- Go语言图书管理RESTful API开发实战
-
Go(Golang)是最近流行起来,且相对较新的编程语言。它小而稳定,使用和学习简单,速度快,经过编译(原生代码),并大量用于云工具和服务(Docker、Kubernetes...)。考虑到它所带来的...
- 轻松搞定Golang 中的内存管理(golang设置内存大小)
-
除非您正在对服务进行原型设计,否则您可能会关心应用程序的内存使用情况。内存占用更小,基础设施成本降低,扩展变得更容易/延迟。尽管Go以不消耗大量内存而闻名,但仍有一些方法可以进一步减少消耗。其中一...
- golang实现deepseek 聊天功能(golang deepcopy)
-
在搭建完deepseek环境后在docker内部署deepseekrag环境,我们可以用golang实现聊天功能。在实现这个功能之前,我们先了解下提示词工程(prompt)。大模型虽然知道的东西多...
- golang slice的扩容机制(golang设置内存大小)
-
在Go语言中,切片(slice)是一种动态数组,其长度可以在运行时改变。当向切片中添加元素时,如果切片的容量不足以容纳新元素,就会触发扩容机制。下面详细介绍Go语言切片的扩容机制。扩容触发条件...
- Etcd服务注册与发现封装实现--golang
-
服务注册register.gopackageregisterimport("fmt""time"etcd3"github.com/cor...
- 嘿,轻松获取区间内所有日期的Golang小技巧!
-
在Go语言中,获取两个日期之间的所有日期可以手动实现一个函数来完成。以下是一个示例函数,它会返回一个日期切片,包含从开始日期到结束日期(包括这两个日期)的所有日期:packagemainimpo...
- 仓颉、Java、Golang性能测试——数组扩容
-
版本信息仓颉版本0.53.18Golang版本1.22.8Java版本corretto-1.8.0_452源码仓颉packagecangjie_testimportstd.collect...
- Golang 58个坑 – 中级篇:36-51(golang cef)
-
36.关闭HTTP的响应体37.关闭HTTP连接38.将JSON中的数字解码为interface类型39.struct、array、slice和map的值比较40.从panic...
- 一篇文章学会golang语法,golang简明教程快速入门
-
Go(又称Golang)是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。——Go-wikipedia.org1Go安装最新版本下载地址官方下载https...
- 运维大神如何使用 Golang 日志监控应用程序
-
你是如何使用Golang日志监控你的应用程序的呢?Golang没有异常,只有错误。因此你的第一印象可能就是开发Golang日志策略并不是一件简单的事情。不支持异常事实上并不是什么问题,异常在...
- Golang操作elasticsearch(golang操作word)
-
简介开源的Elasticsearch是目前全文搜索引擎的首选,很多日志都是放到elasticsearch里面,然后再根据具体的需求进行分析。目前我们的运维系统是使用golang开发的,需要定时到e...
- 一文带你看懂Golang最新特性(golang x)
-
作者:腾讯PCG代码委员会经过十余年的迭代,Go语言逐渐成为云计算时代主流的编程语言。下到云计算基础设施,上到微服务,越来越多的流行产品使用Go语言编写。可见其影响力已经非常强大。一、Go语言发展历史...
- Golang 最常用函数(备用查询)(golang函数和方法)
-
hello.gopackagemainimport"fmt"funcmain(){fmt.Println("Hello,world!")}直...
- Golang:将日志以Json格式输出到Kafka
-
在上一篇文章中我实现了一个支持Debug、Info、Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手。有兴趣的可以通过这个链接前往:https://github.com/...
- 如何从 PHP 过渡到 Golang?(php转go需要多久)
-
我是PHP开发者,转Go两个月了吧,记录一下使用Golang怎么一步步开发新项目。本着有坑填坑,有错改错的宗旨,从零开始,开始学习。因为我司没有专门的Golang大牛,所以我也只能一步步自己去...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)