百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

阅读代码深入原理9——Spring Cloud Netflix之Eureka Server

lipiwang 2024-11-15 22:02 14 浏览 0 评论

spring-cloud-netflix-eureka-server依赖spring-cloud-netflix-eureka-client,关于eureka-client相关的见我的相关文章。

一般我们使用Eureka作为注册中心时使用方法如下:

@SpringBootApplication
@EnableEurekaServer // 注册bean(EurekaServerMarkerConfiguration.Marker)作为一个标识,来启用EurekaServerAutoConfiguration
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

spring.factories机制自动引入EurekaServerAutoConfiguration,它通过Import形式引入了EurekaServerInitializerConfiguration,还定义了EurekaController、InstanceRegistry、EurekaServerBootstrap、RefreshablePeerEurekaNodes、DefaultEurekaServerContext、FilterRegistrationBean、Application(jersey-server的DefaultResourceConfig)等。

根据Spring的SmartLifecycle机制,EurekaServerInitializerConfiguration初始化后调用其start方法,代码如下:

# spring-cloud-netflix-eureka-server-2.2.8.RELEASE.jar!/org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration
	@Override
	public void start() {
		new Thread(() -> {
			try {
				// TODO: is this class even needed now?
				// 1. 从对等节点同步注册信息,见下文
				eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); 
				log.info("Started Eureka Server");
				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); // 2. 发布EurekaRegistryAvailableEvent
				EurekaServerInitializerConfiguration.this.running = true;
				publish(new EurekaServerStartedEvent(getEurekaServerConfig())); // 3. 发布EurekaServerStartedEvent
			}
			catch (Exception ex) {
				// Help!
				log.error("Could not initialize Eureka servlet context", ex);
			}
		}).start();
	}


# spring-cloud-netflix-eureka-server-2.2.8.RELEASE.jar!/org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap
	public void contextInitialized(ServletContext context) {
		try {
			initEurekaEnvironment(); // 1. 检查archaius配置eureka的datacenter、environment,如果不存在则设置默认值(数据中心为default,环境为test)
			initEurekaServerContext();
			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}
	
	protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start(); // 2. datacenterInfo.name为Aws时(默认datacenterInfo.name为MyOwn)会从本地链接地址(http://169.254.169.254/latest/meta-data)获取元信息,通过元信息的instanceId和availabilityZone,再使用DNS或配置绑定eureka-server到EIP(域名部分截取出EIP地址,弹性IP为固定的IP地址,公网可访问);
		}
		EurekaServerContextHolder.initialize(this.serverContext);
		log.info("Initialized server context");
		// Copy registry from neighboring eureka node
		int registryCount = this.registry.syncUp(); // 3. // 调用InstanceRegistry.syncUp,此时从对等节点获取所有Application,注册InstanceInfo到此节点
		this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 4. // 调用InstanceRegistry.openForTraffic,此时将实例状态设为可用,然后启动定时任务(eureka.server.deltaRetentionTimerIntervalInMs,默认5分钟)更新MeasuredRate.currentBucket为0,再启动定期清理任务(eureka.server.evictionIntervalTimerInMs,默认1分钟)来清理租约过期的实例;
		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats(); // 5. 注册监控相关信息,未配置监控实现时使用JMX
	}

接着看同步其它节点注册信息并注册到自身实例过程,EurekaServerAutoConfiguration定义的PeerAwareInstanceRegistry类型为spring的InstanceRegistry,它是netflix的PeerAwareInstanceRegistryImpl的子类。

在InstanceRegistry初始化前,强制eurekaClient获取对等节点的注册信息。

# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) { // 1. (AWS数据中心时)eureka-server实例的可用区包含服务实例的可用区则认为可注册。实际代码始终需注册!
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 2. 注册,见下文
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

AbstractInstanceRegistry.register方法为注册构造的registry数据结构为{appName:{instanceId: {holder:{}, registrationTimestamp:, lastUpdateTimestamp:, duration:}}}。

ResponseCacheImpl持有registry,构建二级缓存readWriteCacheMap,此缓存在获取key时,懒加载形式从registry获取信息。同时ResponseCacheImpl还有一级缓存readOnlyCacheMap,通过定时任务,从readWriteCacheMap更新信息。

# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.AbstractInstanceRegistry
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) { // 1. 应用信息不存在时创建实例Map
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) { // 2. 实例存在且有重启时,更新注册的实例信息
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else { // 3. 新实例注册的则更新续期计算的数据
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); // 4. 创建实例租约信息,过期时间为当前时间大于注册/续期时间与租期之和
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            recentRegisteredQueue.add(new Pair<Long, String>( // 5. 更新最新注册队列,供dashboard展示“/lastn”用
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); // 6. UP和OUT_OF_SERVICE状态可能仅是心跳失败
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); // 7. 将responseCache缓存失效,导致ResponseCacheImpl的二级缓存readWriteCacheMap失效,定时任务更新一级缓存readOnlyCacheMap
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

如果是单节点,注册信息来源于eureka-client的请求。此时,回到EurekaServerAutoConfiguration定义的jerseyApplication、FilterRegistrationBean。

jerseyApplication会使用ClassPathScanningCandidateComponentProvider扫描"com.netflix.discovery"、"com.netflix.eureka"两个包下有Path/Provider注解的类,作为DefaultResourceConfig的类资源。

FilterRegistrationBean拦截匹配形如"/eureka/*“的URL请求,ServletContainer作为包装jerseyApplication的Filter,在Filter初始化时,将com.netflix.discovery包和com.netflix.eureka包下有@Provider或@Path注解的类以path和类映射构成UriRule。

URL和处理类关系参考如下:

# org.springframework.cloud.netflix.eureka.http.RestTemplateEurekaHttpClient
# 注册
POST http://localhost:8761/eureka/apps/{appName}
com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
com.netflix.eureka.resources.ApplicationResource#addInstance
# 心跳
PUT http://localhost:8761/eureka/apps/{appName}/{instanceId}?status=&lastDirtyTimestamp=&overriddenstatus=
com.netflix.eureka.resources.InstanceResource#renewLease
# 下线
DELETE http://localhost:8761/eureka/apps/{appName}/{instanceId}
com.netflix.eureka.resources.InstanceResource#cancelLease
#####
# 获取所有应用信息
GET http://localhost:8761/eureka/apps/?regions=
com.netflix.eureka.resources.ApplicationsResource#getContainers
# 获取有变化的应用
GET http://localhost:8761/eureka/apps/delta?regions=
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
# 获取应用信息
GET http://localhost:8761/eureka/apps/{appName}/{instanceId}
com.netflix.eureka.resources.ApplicationResource#getApplication
# 获取实例信息
GET http://localhost:8761/eureka/instances/{instanceId}
com.netflix.eureka.resources.InstancesResource#getById

DefaultEurekaServerContext初始化时先调用PeerEurekaNodes.start,然后调用PeerAwareInstanceRegistryImpl.init,代码如下:

# eureka-core-1.10.11.jar!/com.netflix.eureka.cluster.PeerEurekaNodes
    protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
		// 1. 从archaius或配置文件先获取当前eureka-client的region(默认region为us-east-1),再获取region下的availabilityZones(默认zone为default);
		// 如果数据中心是AWS,则从元信息获取可用区,否则返回配置的availabilityZones下指定region的第一个可用区!?
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils // 2. DNS方式获取(默认不使用)或从配置中获取当前可用区(或默认可用区)的服务节点
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) { // 2. 节点为eureka.server.myUrl,或者节点地址包含eureka.instance.hostname
                replicaUrls.remove(idx); // 3. 移除自身节点
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }
	
    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
			
			// 4. 关闭之前的节点,创建新节点(启动定时任务)
            updatePeerEurekaNodes(resolvePeerUrls()); 
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
                }
            };
            taskExecutor.scheduleWithFixedDelay( // 5. 定时更新对等节点信息
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        initializedResponseCache(); // 1. 初始化缓存
        scheduleRenewalThresholdUpdateTask(); // 2. 定时任务更新numberOfRenewsPerMinThreshold
        initRemoteRegionRegistry(); // 3. 如果eureka.server.remoteRegionUrlsWithName不为空(默认为空),则创建RemoteRegionRegistry,在构造函数内创建任务来获取远程区域注册中心的应用信息
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

当请求“/eureka/apps/{appName}”到达时,触发com.netflix.eureka.resources.ApplicationResource#addInstance,导致com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register,进而导致信息同步给其它eureka-server实例,代码如下:

# eureka-core-1.10.11.jar!/com.netflix.eureka.cluster.PeerEurekaNodes
    protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
		// 1. 从archaius或配置文件先获取当前eureka-client的region(默认region为us-east-1),再获取region下的availabilityZones(默认zone为default);
		// 如果数据中心是AWS,则从元信息获取可用区,否则返回配置的availabilityZones下指定region的第一个可用区!?
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils // 2. DNS方式获取(默认不使用)或从配置中获取当前可用区(或默认可用区)的服务节点
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) { // 2. 节点为eureka.server.myUrl,或者节点地址包含eureka.instance.hostname
                replicaUrls.remove(idx); // 3. 移除自身节点
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }
	
    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
			
			// 4. 关闭之前的节点,创建新节点(启动定时任务)
            updatePeerEurekaNodes(resolvePeerUrls()); 
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
                }
            };
            taskExecutor.scheduleWithFixedDelay( // 5. 定时更新对等节点信息
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
```
```java
# eureka-core-1.10.11.jar!/com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        initializedResponseCache(); // 1. 初始化缓存
        scheduleRenewalThresholdUpdateTask(); // 2. 定时任务更新numberOfRenewsPerMinThreshold
        initRemoteRegionRegistry(); // 3. 如果eureka.server.remoteRegionUrlsWithName不为空(默认为空),则创建RemoteRegionRegistry,在构造函数内创建任务来获取远程区域注册中心的应用信息
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

相关推荐

Go语言图书管理RESTful API开发实战

Go(Golang)是最近流行起来,且相对较新的编程语言。它小而稳定,使用和学习简单,速度快,经过编译(原生代码),并大量用于云工具和服务(Docker、Kubernetes...)。考虑到它所带来的...

轻松搞定Golang 中的内存管理(golang设置内存大小)

除非您正在对服务进行原型设计,否则您可能会关心应用程序的内存使用情况。内存占用更小,基础设施成本降低,扩展变得更容易/延迟。尽管Go以不消耗大量内存而闻名,但仍有一些方法可以进一步减少消耗。其中一...

golang实现deepseek 聊天功能(golang deepcopy)

在搭建完deepseek环境后在docker内部署deepseekrag环境,我们可以用golang实现聊天功能。在实现这个功能之前,我们先了解下提示词工程(prompt)。大模型虽然知道的东西多...

golang slice的扩容机制(golang设置内存大小)

在Go语言中,切片(slice)是一种动态数组,其长度可以在运行时改变。当向切片中添加元素时,如果切片的容量不足以容纳新元素,就会触发扩容机制。下面详细介绍Go语言切片的扩容机制。扩容触发条件...

Etcd服务注册与发现封装实现--golang

服务注册register.gopackageregisterimport("fmt""time"etcd3"github.com/cor...

嘿,轻松获取区间内所有日期的Golang小技巧!

在Go语言中,获取两个日期之间的所有日期可以手动实现一个函数来完成。以下是一个示例函数,它会返回一个日期切片,包含从开始日期到结束日期(包括这两个日期)的所有日期:packagemainimpo...

仓颉、Java、Golang性能测试——数组扩容

版本信息仓颉版本0.53.18Golang版本1.22.8Java版本corretto-1.8.0_452源码仓颉packagecangjie_testimportstd.collect...

Golang 58个坑 – 中级篇:36-51(golang cef)

36.关闭HTTP的响应体37.关闭HTTP连接38.将JSON中的数字解码为interface类型39.struct、array、slice和map的值比较40.从panic...

一篇文章学会golang语法,golang简明教程快速入门

Go(又称Golang)是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。——Go-wikipedia.org1Go安装最新版本下载地址官方下载https...

运维大神如何使用 Golang 日志监控应用程序

你是如何使用Golang日志监控你的应用程序的呢?Golang没有异常,只有错误。因此你的第一印象可能就是开发Golang日志策略并不是一件简单的事情。不支持异常事实上并不是什么问题,异常在...

Golang操作elasticsearch(golang操作word)

简介开源的Elasticsearch是目前全文搜索引擎的首选,很多日志都是放到elasticsearch里面,然后再根据具体的需求进行分析。目前我们的运维系统是使用golang开发的,需要定时到e...

一文带你看懂Golang最新特性(golang x)

作者:腾讯PCG代码委员会经过十余年的迭代,Go语言逐渐成为云计算时代主流的编程语言。下到云计算基础设施,上到微服务,越来越多的流行产品使用Go语言编写。可见其影响力已经非常强大。一、Go语言发展历史...

Golang 最常用函数(备用查询)(golang函数和方法)

hello.gopackagemainimport"fmt"funcmain(){fmt.Println("Hello,world!")}直...

Golang:将日志以Json格式输出到Kafka

在上一篇文章中我实现了一个支持Debug、Info、Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手。有兴趣的可以通过这个链接前往:https://github.com/...

如何从 PHP 过渡到 Golang?(php转go需要多久)

我是PHP开发者,转Go两个月了吧,记录一下使用Golang怎么一步步开发新项目。本着有坑填坑,有错改错的宗旨,从零开始,开始学习。因为我司没有专门的Golang大牛,所以我也只能一步步自己去...

取消回复欢迎 发表评论: