响应式编程简介之:Reactor
lipiwang 2024-11-26 06:05 8 浏览 0 评论
简介
Reactor是reactivex家族的一个非常重要的成员,Reactor是第四代的reactive library,它是基于Reactive Streams标准基础上开发的,主要用来构建JVM环境下的非阻塞应用程序。
今天给大家介绍一下Reactor。
Reactor简介
Reactor是基于JVM的非阻塞API,他直接跟JDK8中的API相结合,比如:CompletableFuture,Stream和Duration等。
它提供了两个非常有用的异步序列API:Flux和Mono,并且实现了Reactive Streams的标准。
并且还可以和reactor-netty相结合,作为一些异步框架的底层服务,比如我们非常熟悉的Spring MVC 5中引入的WebFlux。
我们知道WebFlux的底层使用的是reactor-netty,而reactor-netty又引用了Reactor。所以,如果你在POM中引入了webFlux依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
那么项目将会自动引入Reactor。
如果你用的不是Spring webflux,没关系,你可以直接添加下面的依赖来使用Reactor:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
reactive programming的发展史
最最开始的时候微软为.NET平台创建了Reactive Extensions (Rx) library。接着RxJava实现了JVM平台的Reactive。
然后Reactive Streams标准出现了,它定义了Java平台必须满足的的一些规范。并且已经集成到JDK9中的java.util.concurrent类中。
在Flow中定义了实现Reactive Streams的四个非常重要的组件,分别是Publisher,Subscriber,Subscription和Processor。
Iterable-Iterator 和Publisher-Subscriber的区别
一般来说reactive在面向对象的编程语言中是以观察者模式的扩展来使用的。
我们来具体看一下这个观察者模式的实现,以Publisher和Subscriber为例:
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
上面定义了两个接口,Publisher和Subscriber,Publisher的作用就是subscribe到subscriber。
而subscriber定义了4个on方法,用来触发特定的事件。
那么Publisher中的subscribe是怎么触发Subscriber的onSubscribe事件呢?
很简单,我们看一个具体的实现:
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Subscription sub;
if (throwable != null) {
assert iterable == null : "non-null iterable: " + iterable;
sub = new Subscription(subscriber, null, throwable);
} else {
assert throwable == null : "non-null exception: " + throwable;
sub = new Subscription(subscriber, iterable.iterator(), null);
}
subscriber.onSubscribe(sub);
if (throwable != null) {
sub.pullScheduler.runOrSchedule();
}
}
上面的例子是PullPublisher的subscribe实现。我们可以看到,在这个subscribe中触发了subscriber.onSubscribe方法。而这就是观察者模式的秘密。
或者说,当Publisher调用subscribe的时候,是主动push subscriber的onSubscribe方法。
熟悉Iterable-Iterator模式的朋友应该都知道,Iterator模式,其实是一个主动的pull模式,因为需要不断的去调用next()方法。所以它的控制权是在调用方。
为什么要使用异步reactive
在现代应用程序中,随着用户量的增多,程序员需要考虑怎么才能提升系统的处理能力。
传统的block IO的方式,因为需要占用大量的资源,所以是不适合这样的场景的。我们需要的是NO-block IO。
JDK中提供了两种异步编程的模型:
第一种是Callbacks,异步方法可以通过传入一个Callback参数的形式来在Callback中执行异步任务。比较典型的像是java Swing中的EventListener。
第二中就是使用Future了。我们使用Callable来提交一个任务,然后通过Future来拿到它的运行结果。
这两种异步编程会有什么问题呢?
callback的问题就在于回调地狱。熟悉JS的朋友应该很理解这个回调地狱的概念。
简单点讲,回调地狱就是在callback中又使用了callback,从而造成了这种callback的层级调用关系。
而Future主要是对一个异步执行的结果进行获取,它的 get()实际上是一个block操作。并且不支持异常处理,也不支持延迟计算。
当有多个Future的组合应该怎么处理呢?JDK8 实际上引入了一个CompletableFuture类,这个类是Future也是一个CompletionStage,CompletableFuture支持then的级联操作。不过CompletableFuture提供的方法不是那么的丰富,可能满足不了我的需求。
于是我们的Reactor来了。
Flux
Reactor提供了两个非常有用的操作,他们是 Flux 和 Mono。 其中Flux 代表的是 0 to N 个响应式序列,而Mono代表的是0或者1个响应式序列。
我们看一个Flux是怎么transfer items的:
先看下Flux的定义:
public abstract class Flux<T> implements Publisher<T>
可以看到Flux其实就是一个Publisher,用来产生异步序列。
Flux提供了非常多的有用的方法,来处理这些序列,并且提供了completion和error的信号通知。
相应的会去调用Subscriber的onNext, onComplete, 和 onError 方法。
Mono
我们看下Mono是怎么transfer items的:
看下Mono的定义:
public abstract class Mono<T> implements Publisher<T>
Mono和Flux一样,也是一个Publisher,用来产生异步序列。
Mono因为只有0或者1个序列,所以只会触发Subscriber的onComplete和onError方法,没有onNext。
另一方面,Mono其实可以看做Flux的子集,只包含Flux的部分功能。
Mono和Flux是可以互相转换的,比如Mono#concatWith(Publisher)返回一个Flux,而 Mono#then(Mono)返回一个Mono.
Flux和Mono的基本操作
我们看下Flux创建的例子:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
可以看到Flux提供了很多种创建的方式,我们可以自由选择。
再看看Flux的subscribe方法:
Disposable subscribe();
Disposable subscribe(Consumer<? super T> consumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
subscribe可以一个参数都没有,也可以多达4个参数。
看下没有参数的情况:
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
numbersFromFiveToSeven.subscribe();
注意,没有参数并不表示Flux的对象不被消费,只是不可见而已。
看下带参数的情况:consumer用来处理on each事件,errorConsumer用来处理on error事件,completeConsumer用来处理on complete事件,subscriptionConsumer用来处理on subscribe事件。
前面的3个参数很好理解,我们来举个例子:
Flux<Integer> ints3 = Flux.range(1, 4);
ints3.subscribe(System.out::println,
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(2));
我们构建了从1到4的四个整数的Flux,on each就是打印出来,如果中间有错误的话,就输出Error,全部完成就输出Done。
那么最后一个subscriptionConsumer是做什么用的呢?
subscriptionConsumer accept的是一个Subscription对象,我们看下Subscription的定义:
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription 定义了两个方法,用来做初始化用的,我们可以调用request(n)来决定这次subscribe获取元素的最大数目。
比如上面我们的例子中,虽然构建了4个整数,但是最终输出的只有2个。
上面所有的subscribe方法,都会返回一个Disposable对象,我们可以通过Disposable对象的dispose()方法,来取消这个subscribe。
Disposable只定义了两个方法:
public interface Disposable {
void dispose();
default boolean isDisposed() {
return false;
}
dispose的原理是向Flux 或者 Mono发出一个停止产生新对象的信号,但是并不能保证对象产生马上停止。
有了Disposable,当然要介绍它的工具类Disposables。
Disposables.swap() 可以创建一个Disposable,用来替换或者取消一个现有的Disposable。
Disposables.composite(…)可以将多个Disposable合并起来,在后面统一做处理。
总结
本文介绍了Reactor的基本原理和两非常重要的组件Flux和Mono,下一篇文章我们会继续介绍Reactor core的一些更加高级的用法。敬请期待。
本文的例子learn-reactive
本文作者:flydean程序那些事
本文链接:http://www.flydean.com/introduction-to-reactor/
本文来源:flydean的博客
欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
相关推荐
- linux实例之设置时区的方式有哪些
-
linux系统下的时间管理是一个复杂但精细的功能,而时区又是时间管理非常重要的一个辅助功能。时区解决了本地时间和UTC时间的差异,从而确保了linux系统下时间戳和时间的准确性和一致性。比如文件的时间...
- Linux set命令用法(linux cp命令的用法)
-
Linux中的set命令用于设置或显示系统环境变量。1.设置环境变量:-setVAR=value:设置环境变量VAR的值为value。-exportVAR:将已设置的环境变量VAR导出,使其...
- python环境怎么搭建?小白看完就会!简简单单
-
很多小伙伴安装了python不会搭建环境,看完这个你就会了Python可应用于多平台包括Linux和MacOSX。你可以通过终端窗口输入"python"命令来查看本地是否...
- Linux环境下如何设置多个交叉编译工具链?
-
常见的Linux操作系统都可以通过包管理器安装交叉编译工具链,比如Ubuntu环境下使用如下命令安装gcc交叉编译器:sudoapt-getinstallgcc-arm-linux-gnueab...
- JMeter环境变量配置技巧与注意事项
-
通过给JMeter配置环境变量,可以快捷的打开JMeter:打开终端。执行jmeter。配置环境变量的方法如下。Mac和Linux系统在~/.bashrc中加如下内容:export...
- C/C++|头文件、源文件分开写的源起及作用
-
1C/C++编译模式通常,在一个C++程序中,只包含两类文件——.cpp文件和.h文件。其中,.cpp文件被称作C++源文件,里面放的都是C++的源代码;而.h文件则被称...
- linux中内部变量,环境变量,用户变量的区别
-
unixshell的变量分类在Shell中有三种变量:内部变量,环境变量,用户变量。内部变量:系统提供,不用定义,不能修改环境变量:系统提供,不用定义,可以修改,可以利用export将用户变量转为环...
- 在Linux中输入一行命令后究竟发生了什么?
-
Linux,这个开源的操作系统巨人,以其强大的命令行界面而闻名。无论你是初学者还是经验丰富的系统管理员,理解在Linux终端输入一条命令并按下回车后发生的事情,都是掌握Linux核心的关键。从表面上看...
- Nodejs安装、配置与快速入门(node. js安装)
-
Nodejs是现代JavaScript语言产生革命性变化的一个主要框架,它使得JavaScript从一门浏览器语言成为可以在服务器端运行、开发各种各样应用的通用语言。在不同的平台下,Nodejs的安装...
- Ollama使用指南【超全版】(olaplex使用方法图解)
-
一、Ollama快速入门Ollama是一个用于在本地运行大型语言模型的工具,下面将介绍如何在不同操作系统上安装和使用Ollama。官网:https://ollama.comGithub:http...
- linux移植(linux移植lvgl)
-
1uboot移植l移植linux之前需要先移植一个bootlader代码,主要用于启动linux内核,lLinux系统包括u-boot、内核、根文件系统(rootfs)l引导程序的主要作用将...
- Linux日常小技巧参数优化(linux参数调优)
-
Linux系统参数优化可以让系统更加稳定、高效、安全,提高系统的性能和使用体验。下面列出一些常见的Linux系统参数优化示例,包括修改默认配置、网络等多方面。1.修改默认配置1.1修改默认编辑器默...
- Linux系统编程—条件变量(linux 条件变量开销)
-
条件变量是用来等待线程而不是上锁的,条件变量通常和互斥锁一起使用。条件变量之所以要和互斥锁一起使用,主要是因为互斥锁的一个明显的特点就是它只有两种状态:锁定和非锁定,而条件变量可以通过允许线程阻塞和等...
- 面试题-Linux系统优化进阶学习(linux系统的优化)
-
一.基础必备优化:1.关闭SElinux2.FirewalldCenetOS7Iptables(C6)安全组(阿里云)3.网络管理服务||NetworkManager|network...
- 嵌入式Linux开发教程:Linux Shell
-
本章重点介绍Linux的常用操作和命令。在介绍命令之前,先对Linux的Shell进行了简单介绍,然后按照大多数用户的使用习惯,对各种操作和相关命令进行了分类介绍。对相关命令的介绍都力求通俗易懂,都给...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)