Spring实战——理解Reactor的设计与实现
lipiwang 2024-11-26 06:05 7 浏览 0 评论
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。 它提供了可组合的异步序列API,例如Flux(用于[N]个元素)和Mono(用于[0 | 1]个元素)。
Reactor Netty项目还支持非阻塞式网络通信,非常适用于微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。
本文通过例子展示和源码阅读,分析Reactor中核心设计与实现机制。
文本Reactor源码基于Reactor 3.3
名词解析
响应式编程,维基百科解析为
reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s)
响应式编程是一个专注于数据流和变化传递的异步编程范式。 这意味着使用编程语言可以很容易地表示静态(例如数组)或动态(例如事件发射器)数据流。
下面简单解释一下相关名词。
数据流与变化传递,我的理解,数据流就如同一条车间流水线,数据在上面传递,经过不同的操作台(我们定义的操作方法),可以被观测,被过滤,被调整,或者与另外一条数据流合并为一条新的流,而操作台对数据做的改变会一直向下传递给其他操作台。
java 8 lambda表达式就是一种数据流形式
lists.stream().filter(i -> i%2==0).sorted().forEach(handler);
lists.stream(),构建一个数据流,负责生产数据。
filter,sorted方法以及handler匿名类,都可以视为操作台,他们负责处 理数据。
这里还涉及两个概念
声明式编程,通过表达式直接告诉计算机我们要的结果,具体操作由底层实现,我们并不关心,如sql,html,spring spel。
对应的命令式编程,一步一步告诉计算机先做什么再做什么。我们平时编写java,c等代码就是命令式编程。
上例中通过filter,sorted等方法直接告诉计算机(Spring)执行过滤,排序操作,可以理解为声明式编程。
注意,我的理解是,声明式,命令式编程并没有明确的界限。
越是可以直接通过声明表达我们要什么,就越接近声明式编程,反之,越是需要我们编写操作过程的,就越接近命令式编程。
如Spring中的声明式事务和编程式事务。
可参考:https://www.zhihu.com/question/22285830
函数式编程,就是将函数当做一个数据类型,函数作为参数,返回值,属性。
Java不支持该模式,通过匿名类实现,如上例中forEach方法。
注意,函数式编程还有很多学术性,专业性的概念,感兴趣的同学可以自行了解。
响应式编程,主要是在上面概念加了异步支持。
这个异步支持非常有用,它可以跟Netty这些基于事件模型的异步网络框架很好地结合,下一篇文章我们通过WebFlux来说明这一点。
数据流转
下面我们来简单看一下Reactor的设计与实现吧。
首先通过一个小用例,来看一个Reactor中如何生产数据,又如何传递给订阅者。
@Test
public void range() {
// [1]
Flux flux = Flux.range(1, 10);
// [2]
Subscriber subscriber = new BaseSubscriber<Integer>() {
protected void hookOnNext(Integer value) {
System.out.println(Thread.currentThread().getName() + " -> " + value);
request(1);
}
};
// [3]
flux.subscribe(subscriber);
}
Reactor中,发布者Publisher负责生产数据,有两种发布者,Flux可以生产N个数据,Mono可以生产0~1个数据。
订阅者Subscriber负责处理,消费数据。
1 构建一个发布者Flux
注意,这时发布者还没开始生产数据。
2 构建一个订阅者Subscriber
3 创建订阅关系,这时,生产者开始生产数据,并传递给订阅者。
Flux.range,fromArray等静态方法都会返回一个Flux子类,如FluxRange,FluxArray。
Publisher#subscribe,该方法很重要,它负责创建发布者与订阅者的订阅关系。
Flux#subscribe
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
...
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
获取内部的CorePublisher,CoreSubscriber。
Flux子类都是一个CorePublisher。
我们编写的订阅者,都会转化为一个CoreSubscriber。
CorePublisher也有一个内部的subscribe方法,由Flux子类实现。
FluxRange#subscribe
public void subscribe(CoreSubscriber<? super Integer> actual) {
...
actual.onSubscribe(new RangeSubscription(actual, st, en));
}
Subscription代表了发布者与订阅者之间的一个订阅关系,由Publisher端实现。
Flux子类subscribe方法中通常会使用CoreSubscriber创建为Subscription,并调用订阅者的onSubscribe方法,这时订阅关系已完成。
下面来看一下Subscriber端的onSubscribe方法
BaseSubscriber#onSubscribe -> hookOnSubscribe
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(9223372036854775807L);
}
Subscription#request由Publisher端实现,也是核心方法,订阅者通过该方法向发布者拉取特定数量的数据。
注意,这时发布者才开始生产数据。
RangeSubscription#request -> RangeSubscription#slowPath -> Subscriber#onNext
void slowPath(long n) {
Subscriber<? super Integer> a = this.actual;
long f = this.end;
long e = 0L;
long i = this.index;
while(!this.cancelled) {
// [1]
while(e != n && i != f) {
a.onNext((int)i);
if (this.cancelled) {
return;
}
++e;
++i;
}
...
}
}
1 RangeSubscription负责生产指定范围内的整数,并调用Subscriber#onNext将数据推送到订阅者。
可以看到,
Publisher#subscribe完成订阅操作,生成Subscription订阅关系,并触发订阅者钩子方法onSubscribe。
订阅者的onSubscribe方法中,订阅者开始调用Subscription#request请求数据,这时发布者才开始生产数据,并将数据推给订阅者。
操作符方法
跟java 8 lambda表达式一样,Reactor提供了很多的声明式方法,这些方法类似于操作符,直接操作数据(下文称为操作符方法)。
合理利用这些方法,可以大量简化我们的工作。
数据处理,如skip,distinct,sort,filter
钩子方法,如doOnNext,doOnSuccess
组合操作,flatMap,zipWhen
阻塞等待,blockLast
流量控制,limitRate
数据缓存,buffer,cache
可参考官方文档:https://projectreactor.io/docs/core/release/reference/#which-operator
注意,这些操作符方法虽然是添加到Publisher端,但Reactor会将逻辑会转移到Subscriber端。
看一个简单例子
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.skip(1)
.subscribe(myHandler);
myHandler即我们实现的Subscriber。
每调用一次操作符方法,Flux都会生成一个新的Flux子类(装饰模式),最后Flux类为FluxSkip[FluxPeek[FluxRange]]。
我们来看一下完整的Flux#subscribe方法代码
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
// [1]
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator)publisher;
while(true) {
// [2]
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
return;
}
// [3]
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
// [4]
publisher.subscribe(subscriber);
} catch (Throwable var6) {
Operators.reportThrowInSubscribe(subscriber, var6);
}
}
1 判断Flux是否由操作符方法产生。
2 OptimizableOperator#subscribeOrReturn会生成新的Subscriber,以执行操作符逻辑。如上面例子中,FluxPeek会生成PeekSubscriber,FluxSkip生成SkipSubscriber。这里将操作符逻辑转移到Subscriber端。
OptimizableOperator#subscribeOrReturn也可以直接调用被装饰Publisher的subscribe方法,从而改变流程。如下面说的FluxSubscribeOn。
3 取出上一层被装饰的Publisher作为新的Publisher,如上例的FluxSkip[FluxPeek[FluxRange]],会依次取出FluxPeek,FluxRange。
这个操作一直执行,直到取出真正生产数据的Publisher。
4 使用真正生产数据的Publisher,和最后包装好的Subscriber,再调用subscribe方法。
上面例子中,流程如下
push/pull
Reactor提供了push和pull两种模式。
先看一下pull模式
Flux.generate(sink -> {
int k = (int) (Math.random()*10);
if(k > 8)
sink.complete();
sink.next(k);
})
.subscribe(i -> {
System.out.println("receive:" + i);
});
Sink可以理解为数据池,负责存储数据,根据功能不同划分,如IgnoreSink,BufferAsyncSink,LatestAsyncSink。
Sink#next会将数据放入池中,由Sink缓存或直接发送给订阅者。
Flux#generate(Consumer<SynchronousSink<T>> generator),可以理解为pull模式,
订阅者每调用一次request方法,Publisher就会调用一次generator来生产数据,而且generator每次执行中只能调用一次Sink.next。
generator是单线程执行,生成数据后直接同步发送到订阅者。
push模式可以使用create方法
Flux.create(sink -> {
System.out.println("please entry data");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
sink.next(br.readLine());
} catch (IOException e) {
}
}
}).subscribe(i -> {
System.out.println("receive:" + i);
});
Flux#create(Consumer<? super FluxSink<T>> emitter),可以理解为push模式。
注意,Publisher只在Flux#subscribe操作时调用一次emitter,后续request不再调用emitter。
我们可以将Sink绑定到其他的数据源,如上例的控制台,或其他事件监听器。
当数据来了,Sink就会将它推送给订阅者。
Flux#create生成的Flux可以多线程同时调用Sink.next推送数据,并且数据会先缓存到Sink,后续Sink推送给订阅者。
push方法与create类似,只是它只允许一个线程调用Sink.next推送数据。
上例中create方法使用push方法更合适,因为只有一个线程推送数据。
混合模式
假如一个消息处理器MessageProcessor需要会将普通消息直接推送给订阅者,而低级别消息由订阅者拉取。
我们可以FluxSink#onRequest实现混合模式
Flux.create(sink -> {
// [1]
messageProcessor.setHandler((msg) -> {
sink.next(msg);
});
// [2]
sink.onRequest(n -> {
List<String> messages = messageProcessor.getLowMsg();
for(String s : messages) {
sink.next(s);
}
});
})
1 普通消息直接推送
2 低级别消息由订阅者拉取
完整代码可参考:https://gitee.com/binecy/bin-springreactive/blob/master/order-service/src/test/java/com/binecy/FluxPushPullTest.java
线程与调度器
前面说了reactor是支持异步的,不过它并没有默认开启异步,我们可以通过调度器开启,如
public void parallel() throws InterruptedException {
Flux.range(0, 100)
.parallel()
.runOn(Schedulers.parallel())
.subscribe(i -> {
System.out.println(Thread.currentThread().getName() + " -> " + i);
});
new CountDownLatch(1).await();
}
parallel 将数据分成指定份数,随后调用runOn方法并行处理这些数据。
runOn 该方法参数指定的任务执行的线程环境。
最后的CountDownLatch用于阻塞主线程,以免进程停止看不到效果。
调度器相当于Reactor中的ExecutorService,不同的调度器定义不同的线程执行环境。
Schedulers提供的静态方法可以创建不同的线程执行环境。
Schedulers.immediate() 直接在当前线程执行
Schedulers.single() 在一个重复利用的线程上执行
Schedulers.boundedElastic() 在由Reactor维护的线程池上执行,该线程池中闲置时间过长(默认值为60s)的线程也将被丢弃,创建线程数量上限默认为CPU核心数x 10。线程数达到上限后,最多可提交10万个任务,这些任务在线程可用时会被执行。该线程池可以为阻塞操作提供很好的支持。阻塞操作可以执行在独立的线程上,不会占用其他资源。
Schedulers.parallel() 固定线程,对于异步IO,可以使用该方案。
Reactor另外提供了两个操作符方法来切换执行上下文,publishOn和subscribeOn。
publishOn影响当前操作符方法后面操作的线程执行环境,而subscribeOn则影响整个链路的线程执行环境。
(runOn与publishOn类似,影响该方法后续操作线程执行环境)
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.publishOn(Schedulers.newParallel("myParallel"))
.skip(1)
.subscribe(myHandler);
myHandler只是简单打印线程和数据
Consumer myHandler = i -> {
System.out.println(Thread.currentThread().getName() + " receive:" + i);
};
输出结果为
main doOnNext:1
main doOnNext:2
main doOnNext:3
myParallel-1 receive:2
myParallel-1 receive:3
publishOn后面的操作(包括skip,myHandler)都已经切换到新的线程。
再来简单看一下publishOn与subscribeOn的实现
前面说了,操作符方法的逻辑会移到Subscriber端,上例过程示意如下
线程切换是在PublishOnSubscriber中完成的,所以PublishOnSubscriber后面的操作都在新线程上。
将上面例子代码修改一下
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.subscribeOn(Schedulers.newParallel("myParallel"))
.skip(1)
.subscribe(myHandler);
输出结果为
myParallel-1 doOnNext:1
myParallel-1 doOnNext:2
myParallel-1 receive:2
myParallel-1 doOnNext:3
myParallel-1 receive:3
从数据生产到消费,所有操作都在新的线程上。
示意图如下
前面说了,Flux#subscribe中会调用OptimizableOperator#subscribeOrReturn方法,而在FluxSubscribeOn中,会直接切换任务线程,后面整个流程都执行在新线程上了。
使用publishOn还是subscribeOn,关键在于阻塞操作是在生产数据时还是消费数据时。
如果阻塞操作在生产数据时,如同步查询数据库,查询下游系统,可以使用subscribeOn
如果阻塞操作在消费数据时,如同步保存数据,可以使用publishOn。
流量控制
响应式编程中常常会出现Backpressure的概念,
它是指在push模式下,当发布者生产数据的速度大于订阅者消费数据的速度,导致出现了订阅者传递给订阅者的逆向压力。
FluxSink.OverflowStrategy定义了在这种场景下的几种处理策略。
IGNORE 完全忽略新的数据
ERROR Publisher抛出异常
DROP 抛弃数据,触发Flux#onBackpressureDrop方法
LATEST 订阅者只能获取最新的一个数据
BUFFER 缓存所有的数据,注意,该缓存没有边界,可能导致内存溢出
FluxSink.OverflowStrategy类似于线程池的任务拒绝策略。
下面来看一个例子
@Test
public void backpressure() throws InterruptedException {
Flux.<Integer>create(sink -> {
for (int i = 0; i < 50; i++) {
System.out.println("push: " + i);
sink.next(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}, FluxSink.OverflowStrategy.ERROR)
.publishOn(Schedulers.newSingle("receiver"), 10)
.subscribe(new BaseSubscriber<Integer>() {
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
protected void hookOnNext(Integer value) {
System.out.println("receive:" + value);
try {
Thread.sleep(12);
} catch (InterruptedException e) {
}
request(1);
}
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
System.exit(1);
}
});
new CountDownLatch(1).await();
}
1 发布者每隔10毫秒生产一个数据
注意,FluxSink.OverflowStrategy.ERROR参数指定了Backpressure处理策略
2 publishOn方法指定后续运行线程环境
注意下文解析的第二个参数。
3 订阅者每隔20毫秒消费一个数据
Sink中有一个关键字段,BaseSink#requested,代表订阅者请求数量。
每次订阅者调用Subscription#request(long n)方法,BaseSink#requested都会加上对应数值n。
而每次生产数据调用Sink#next时,BaseSink#requested都会减1。
当Sink#next执行,如果BaseSink#requested为0,就是执行FluxSink.OverflowStrategy指定策略。
publishOn(Scheduler scheduler, int prefetch)方法会将BaseSink#requested的值初始化为prefetch。
注意,这里并不会生产prefetch个数据并发送给订阅者,只会修改BaseSink#requested。
另外,PublishOnSubscriber中会将Subscription#request操作缓存,达到阀值后合并为一次request操作。
在上面的例子中阀值为prefetch - (prefetch >> 2),就是8了
所以我们会看到结果
receive:5
push: 10
push: 11
21:59:55.828 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 11
发布者发送10(prefetch)个数据后,尽管订阅者已经消费5个数据,并发起5次request操作,但被PublishOnSubscriber缓存了,并没有发送到发布者那边,这时BaseSink#requested已经为0了,抛出OverflowException异常,Sink关闭,后面的数据被抛弃。
可以将订阅者的休眠时间调整为12毫秒,这样当发布者发送10(prefetch)个数据前,PublishOnSubscriber会发起一次request(8)的操作,可以看到
push: 19
22:03:33.779 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 19
也就是到19个数据才抛出异常,抛弃数据。
到这里,我们已经基本了解Reactor的概念,核心设计与实现机制。
感谢作者的创作,持续分享,与大家共勉。
原文出处
作者:binecy
地址:https://www.cnblogs.com/binecy/p/14458911.html
相关推荐
- linux实例之设置时区的方式有哪些
-
linux系统下的时间管理是一个复杂但精细的功能,而时区又是时间管理非常重要的一个辅助功能。时区解决了本地时间和UTC时间的差异,从而确保了linux系统下时间戳和时间的准确性和一致性。比如文件的时间...
- Linux set命令用法(linux cp命令的用法)
-
Linux中的set命令用于设置或显示系统环境变量。1.设置环境变量:-setVAR=value:设置环境变量VAR的值为value。-exportVAR:将已设置的环境变量VAR导出,使其...
- python环境怎么搭建?小白看完就会!简简单单
-
很多小伙伴安装了python不会搭建环境,看完这个你就会了Python可应用于多平台包括Linux和MacOSX。你可以通过终端窗口输入"python"命令来查看本地是否...
- Linux环境下如何设置多个交叉编译工具链?
-
常见的Linux操作系统都可以通过包管理器安装交叉编译工具链,比如Ubuntu环境下使用如下命令安装gcc交叉编译器:sudoapt-getinstallgcc-arm-linux-gnueab...
- JMeter环境变量配置技巧与注意事项
-
通过给JMeter配置环境变量,可以快捷的打开JMeter:打开终端。执行jmeter。配置环境变量的方法如下。Mac和Linux系统在~/.bashrc中加如下内容:export...
- C/C++|头文件、源文件分开写的源起及作用
-
1C/C++编译模式通常,在一个C++程序中,只包含两类文件——.cpp文件和.h文件。其中,.cpp文件被称作C++源文件,里面放的都是C++的源代码;而.h文件则被称...
- linux中内部变量,环境变量,用户变量的区别
-
unixshell的变量分类在Shell中有三种变量:内部变量,环境变量,用户变量。内部变量:系统提供,不用定义,不能修改环境变量:系统提供,不用定义,可以修改,可以利用export将用户变量转为环...
- 在Linux中输入一行命令后究竟发生了什么?
-
Linux,这个开源的操作系统巨人,以其强大的命令行界面而闻名。无论你是初学者还是经验丰富的系统管理员,理解在Linux终端输入一条命令并按下回车后发生的事情,都是掌握Linux核心的关键。从表面上看...
- Nodejs安装、配置与快速入门(node. js安装)
-
Nodejs是现代JavaScript语言产生革命性变化的一个主要框架,它使得JavaScript从一门浏览器语言成为可以在服务器端运行、开发各种各样应用的通用语言。在不同的平台下,Nodejs的安装...
- Ollama使用指南【超全版】(olaplex使用方法图解)
-
一、Ollama快速入门Ollama是一个用于在本地运行大型语言模型的工具,下面将介绍如何在不同操作系统上安装和使用Ollama。官网:https://ollama.comGithub:http...
- linux移植(linux移植lvgl)
-
1uboot移植l移植linux之前需要先移植一个bootlader代码,主要用于启动linux内核,lLinux系统包括u-boot、内核、根文件系统(rootfs)l引导程序的主要作用将...
- Linux日常小技巧参数优化(linux参数调优)
-
Linux系统参数优化可以让系统更加稳定、高效、安全,提高系统的性能和使用体验。下面列出一些常见的Linux系统参数优化示例,包括修改默认配置、网络等多方面。1.修改默认配置1.1修改默认编辑器默...
- Linux系统编程—条件变量(linux 条件变量开销)
-
条件变量是用来等待线程而不是上锁的,条件变量通常和互斥锁一起使用。条件变量之所以要和互斥锁一起使用,主要是因为互斥锁的一个明显的特点就是它只有两种状态:锁定和非锁定,而条件变量可以通过允许线程阻塞和等...
- 面试题-Linux系统优化进阶学习(linux系统的优化)
-
一.基础必备优化:1.关闭SElinux2.FirewalldCenetOS7Iptables(C6)安全组(阿里云)3.网络管理服务||NetworkManager|network...
- 嵌入式Linux开发教程:Linux Shell
-
本章重点介绍Linux的常用操作和命令。在介绍命令之前,先对Linux的Shell进行了简单介绍,然后按照大多数用户的使用习惯,对各种操作和相关命令进行了分类介绍。对相关命令的介绍都力求通俗易懂,都给...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)