携程基于Quasar协程的NIO实践 携程原理
lipiwang 2024-11-01 14:10 7 浏览 0 评论
IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题。目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来Golang、Kotlin等语言的协程(Coroutine)能达到高性能与可读性的兼顾。
本文利用开源的Quasar框架提供的协程对系统进行NIO改造,解决以下两个问题:
1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。
2)使用更轻量的协程同步等待IO,替代处理NIO常用的异步回调。
一、Java异步编程与非阻塞IO
本文改造的系统处理来自前台的任务,通过HTTP请求对端服务,还通过RPC调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。
基于epoll的NIO框架Netty在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理IO的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。
1.1 Java中的异步工具
Java项目大多使用JDK8,除线程外可以获得的异步的编程支持包括CompletableFuture,以及开源的RxJava、Vert.x等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。
如下为将一段简单的逻辑判断使用CompletableFuture进行异步改造后的对比。原始版本使用getA方法获得第一步的请求结果,根据其相应选择使用getB1还是getB2获取第二步的响应作为结果。
HttpResponse a = getA();
HttpResponse b ;
if(a.getBody().equals("1")){
b=getB1();
}
else{
b=getB2();
}
String ans=b.getBody();
首先将三个获取响应的方法改为异步。此处假设getB1与getB2内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。
private CompletableFuture<HttpResponse> getA();
private CompletableFuture<HttpResponse> getB1();
private CompletableFuture<HttpResponse> getB2();
然后使用CompletableFuture的链式调用,将两个步骤组合起来:
String ans = getA()
.thenCompose(a -> {
if (a.getBody().equals("1")) {
return getB1();
} else {
return getB2();
}
}).get()
.getBody();
使用CompletableFuture的链式回调后,代码变得不友好。RxJava等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于if/else、switch/case,乃至while/for、break/continue这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO使用。当时使用NIO时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。
1.2 协程
协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。
和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。
协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。
1.3 Quasar任务调度原理
Quasar(https://github.com/puniverse/quasar)是一个开源的Java协程框架,通过利用Java instrument技术对字节码进行修改,使方法挂起前后可以保存和恢复JVM栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为initial初始化,第二部为通过NIO获取网络响应。
public String instrumentDemo(){
initial();
String ans = getFromNIO();
return ans;
}
Quasar会在initial前增加一个flag字段,表明当前方法执行的位置。第一次执行方法时,检查到flag为0,修改flag为1并继续往下执行initial方法。执行getFromNIO方法前插入字节码指令将栈帧中的数据全部保存在一个Quasar自定义的栈结构中,在执行getFromNIO后,挂起协程,让出线程资源。直至NIO异步完成后,协程调度器将第二次执行该方法,检测到flag为1,将会调用jump指令跳转到returnans语句前,并将保存的栈结构还原到当前栈中,最后调用人return ans语句,方法执行完毕。
二、系统异步IO改造
在项目中添加Quasar依赖后,可以使用Fiber类新建协程。建立的方法与线程类似。
new Fiber(()->{
//方法体
}).start();
2.1 整合Netty与Quasar
系统使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和CompletableFuture两种对响应的异步处理方式。
CompletableFuture自JDK8推出,与之前的Future类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在CompletableFuture注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr框架对它也做了支持,提供了API用于在协程中等待CompletableFuture的结果。调用后,协程将挂起,直至future状态为已完成。
AsyncCompletionStage.get(future)
通过CompletableFuture作为通知中介,我们可以将AsyncHttpClient与Quasar做整合,挂起协程等待IO结果。
//创建HttpClient
AsyncHttpClient httpClient = Dsl.asyncHttpClient();
//创建请求
Request request = createRequest();
//将网络请求交给HttpClient执行
CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
//通过Quasar挂起协程
Response response = AsyncCompletionStage.get(future);
//获取网络结果后,通过future传递response并唤醒协程重新执行
deal(response);
过程可由下图表示。
Quasar框架AsyncCompletionStage.get内部完成的工作相当于,在HttpClient返回的future上注册回调,回调的内容是“IO操作完成后通知调度器唤醒协程”,这样将NIO异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。
2.2 声明挂起方法
Quasar需要织入字节码接管挂起方法的调度,在项目主pom下添加quasar-maven-plugin插件,该插件将在编译后的class文件中修改字节码。
<plugin>
<groupId>com.vlkan</groupId>
<artifactId>quasar-maven-plugin</artifactId>
<version>0.7.9</version>
<executions>
<execution>
<goals>
<goal>instrument</goal>
</goals>
</execution>
</executions>
</plugin>
Quasar通过识别方法是否抛出了该框架定义的SuspendExecution异常决定是否修改字节码。Quasar框架在AsyncCompletionStage.get方法上声明了SuspendExceution异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在try/catch语句时,也必须抛出该异常。
public void startFiber() throws ExecutionException, InterruptedException {
Fiber<Void> fiber = new Fiber<Void>(() -> {
//不用继续抛出异常
Response response = waitNextLayer1();
deal(response);
}).start();
}
private Response waitNextLayer1() throws SuspendExecution {
return waitNextLayer2();
}
private Response waitNextLayer2() throws SuspendExecution {
CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
try {
// Quasar框架工具类抛出SuspendExecution
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
2.3 异步RPC调用
目前主流的RPC框架都基于NIO实现,支持异步回调,有的RPC框架已经直接提供了返回CompletableFuture或ListenableFuture(Guava工具类提供)的异步接口,通过使用ComplatableFuture,可以按前文类似的方法将Quasar与RPC框架结合起来。当RPC框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:
interface Callback<TResponse> {
void callback(TResponse TResponse, Exception e);
}
这种情况,可以使用者自己创建ComplatableFuture,在回调中设置其状态,并调用AsyncCompletionStage.get等待这个future。
CompletableFuture<Response> future=new CompletableFuture<>();
//调用hello接口的异步API
new RpcClient().helloAsync(request, new Callback<Response>() {
public void callback(Response response, Exception e) {
if (e == null) future.complete(response);
else future.completeExceptionally(e);
}
});
//在此处调用Quasar的API,挂起直至RPC调用完成
Response response = AsyncCompletionStage.get(future);
上述代码依然具有异步回调不直观的缺点,通过JDK8的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。
@FunctionalInterface
private interface RpcAsyncCall<TRequest, TResponse> {
void request(TRequest request, Callback<TResponse> callback);
}
public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {
CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> {
if (e == null) future.complete(response);
else future.completeExceptionally(e);
});
try {
//使用Quasar等待Future结果
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
最后的调用可简化一行代码,该方法适用于所有该Rpc框架提供的异步接口。
Response response= waitRpc(new RpcClient()::helloAsync, request);
2.4 阻塞操作的处理
Quasar协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞IO的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。
public void waitBlocking() throws SuspendExecution {
//从DB获取结果
String ans = waitBlocking(this::selectFromDB);
}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution {
CompletableFuture<T> future = new CompletableFuture<>();
threadPool.submit(() -> {
T ans = supplier.get();
future.complete(ans);
});
try {
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
2.5 并发工具的使用
协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在synchronized同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现IllegalMonitorStateException异常。
但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。
JDK并发包中的工具可分为两类,一类是Lock、Semaphore、CountDownLatch等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如await时,可能导致协程的执行线程中发生阻塞。
三、总结
系统运行在4核心的主机上,线程池构成如下。
业务逻辑运行在Quasar的协程调度线程池中,线程池大小为CPU核数。HTTP请求与RPC调用均通过内部的NIO线程池管理。此外定义了一个core size为8的可伸缩的线程池用于少量消息队列、DB等阻塞IO的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。
改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而CPU利用率也从平均5%以下提升至10%-60%,在瞬时与高峰流量下能保持稳定。集群CPU核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至1/5。
3.1 限制与风险
Quasar协程不是Java的语言标准,没有JVM层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。
代码的try/catch时可能同时捕获SuspendExecution异常,从而忘记标记方法,此方法字节码不会被修改,结合Quasar的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加SuspendExecution标记。
在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread的构造方法中传入的是Runnable接口对象,其run方法没有声明SuspendExecution异常,run内部的语句不会被织入字节码,造成上述异常。
3.2 总结与展望
协程使得NIO能够更好地应用在Java中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。
异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在2018年创建了Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在JVM上实现轻量级的线程,并解除JVM线程与内核线程的映射。相信会给Java生态带来巨大的改变。
【作者简介】Ryan,携程Java开发工程师,对高并发、网络编程等领域有浓厚兴趣。
更多携程技术人一手干货文章,请关注“携程技术”微信公众号。
相关推荐
- ubuntu单机安装open-falcon极度详细操作
-
备注:以下操作均由本人实际操作并得到验证,喜欢的同学可尝试操作安装。步骤一1.1环境准备(使用系统:ubuntu18.04)1.1.1安装redisubuntu下安装(参考借鉴:https://...
- Linux搭建promtail、loki、grafana轻量日志监控系统
-
一:简介日志监控告警系统,较为主流的是ELK(Elasticsearch、Logstash和Kibana核心套件构成),虽然优点是功能丰富,允许复杂的操作。但是,这些方案往往规模复杂,资源占用高,...
- 一文搞懂,WAF阻止恶意攻击的8种方法
-
WAF(Web应用程序防火墙)是应用程序和互联网流量之间的第一道防线,它监视和过滤Internet流量以阻止不良流量和恶意请求,WAF是确保Web服务的可用性和完整性的重要安全解决方案。它...
- 14配置appvolume(ios14.6配置文件)
-
使用AppVolumes应用程序功能,您可以管理应用程序的整个生命周期,包括打包、更新和停用应用程序。您还可以自定义应用程序分配,以向最终用户提供应用程序的特定版本14.1安装appvolume...
- 目前流行的缺陷管理工具(缺陷管理方式存在的优缺点)
-
摘自:https://blog.csdn.net/jasonteststudy/article/details/7090127?utm_medium=distribute.pc_relevant.no...
- 开源数字货币交易所开发学习笔记(2)——SpringCloud
-
前言码云(Gitee)上开源数字货币交易所源码CoinExchange的整体架构用了SpringCloud,对于经验丰富的Java程序员来说,可能很简单,但是对于我这种入门级程序员,还是有学习的必要的...
- 开发JAX-RPC Web Services for WebSphere(下)
-
在开发JAX-RPCWebServicesforWebSphere(上)一文中,小编为大家介绍了如何创建一个Web服务项目、如何创建一个服务类和Web服务,以及部署项目等内容。接下来小编将为大...
- CXF学习笔记1(cxf client)
-
webservice是发布服务的简单并实用的一种技术了,个人学习了CXF这个框架,也比较简单,发布了一些笔记,希望对笔友收藏并有些作用哦1.什么是webServicewebService让一个程序可...
- 分布式RPC最全详解(图文全面总结)
-
分布式通信RPC是非常重要的分布式系统组件,大厂经常考察的Dubbo等RPC框架,下面我就全面来详解分布式通信RPC@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》...
- Oracle WebLogic远程命令执行0day漏洞(CVE-2019-2725补丁绕过)预警
-
概述近日,奇安信天眼与安服团队通过数据监控发现,野外出现OracleWebLogic远程命令执行漏洞最新利用代码,此攻击利用绕过了厂商今年4月底所发布的最新安全补丁(CVE-2019-2725)。由...
- Spring IoC Container 原理解析(spring中ioc三种实现原理)
-
IoC、DI基础概念关于IoC和DI大家都不陌生,我们直接上martinfowler的原文,里面已经有DI的例子和spring的使用示例《InversionofControlContainer...
- Arthas线上服务器问题排查(arthas部署)
-
1Arthas(阿尔萨斯)能为你做什么?这个类从哪个jar包加载的?为什么会报各种类相关的Exception?我改的代码为什么没有执行到?难道是我没commit?分支搞错了?遇到问题无法在...
- 工具篇之IDEA功能插件HTTP_CLENT(idea2021插件)
-
工具描述:Java开发人员通用的开发者工具IDEA集成了HTTPClient功能,之后可以无需单独安装使用PostMan用来模拟http请求。创建方式:1)简易模式Tools->HTTPCl...
- RPC、Web Service等几种远程监控通信方式对比
-
几种远程监控通信方式的介绍一.RPCRPC使用C/S方式,采用http协议,发送请求到服务器,等待服务器返回结果。这个请求包括一个参数集和一个文本集,通常形成“classname.meth...
- 《github精选系列》——SpringBoot 全家桶
-
1简单总结1SpringBoot全家桶简介2项目简介3子项目列表4环境5运行6后续计划7问题反馈gitee地址:https://gitee.com/yidao620/springbo...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)