百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

RxJava学习入门2.转换、组合、功能操作符

lipiwang 2024-11-27 17:19 7 浏览 0 评论


一、转换操作符

1. map

对Observable的事件进行处理,产生新的事件,再次发射。

    @Test
    public void testMap(){

        Observable.just("Hello")   // 这个消息将被map处理
                .map(new Function<String, Object>(){

                    @Override
                    public Object apply(@NotNull String s) throws Exception {
                        System.out.println("中间人:" + s);
                        return "map function";   // map生成的事件再给外面观察者使用
                    }
                }).subscribe(observer);
    }

运行效果:

建立订阅时调用: onSubscribe
中间人:Hello
调用 onNext:map function
订阅执行完成 onComplete

2. flatMap

对事件进行处理,产生新的Observable。

    @Test
    public void testFlatMap(){

        Observable.just("Hello","1","2","3")
                .flatMap(new Function<String, ObservableSource<?>>(){

                    @Override
                    public ObservableSource apply(@NotNull String s) throws Exception {
                        return Observable.just("新的Observable:" + s);
                    }
                }).subscribe(observer);
    }
123456789101112
建立订阅时调用: onSubscribe
调用 onNext:新的Observable
订阅执行完成 onComplete

3. concatMap

类似于flatMap,concatMap转出来的事件是有序的,flatMap是无序的。

4. buffer

把事件合并发送。

    @Test
    public void testBuffer(){

        Observable.just("Hello","1","2","3","4","5","6","7","8","9")
                .buffer(4)
                .subscribe(observer);
    }

运行效果:

建立订阅时调用: onSubscribe
调用 onNext:[Hello, 1, 2, 3]
调用 onNext:[4, 5, 6, 7]
调用 onNext:[8, 9]
订阅执行完成 onComplete

二、组合操作符

1. concat

把Observable的事件组合在一起发射。

    @Test
    public void testConcat(){

        Observable.concat(Observable.just("1"),
                Observable.just("2"),
                Observable.just("3"))
        .subscribe(observer);
    }

运行效果:

建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:2
调用 onNext:3
订阅执行完成 onComplete

可以看到订阅和完成只执行了一次。

2. concatArray

3. merge

和concat区别是,concat是串行发送,merge并行发送。

三、功能操作符

下面的操作示例演示在执行耗时操作时如何 进行线程切换。

1. 普通的订阅事件,程序在同一个线程运行

    Observer observer = new Observer<Object>() {
        @Override
        public void onSubscribe(@NotNull Disposable d) {
            System.out.println("建立订阅时调用: onSubscribe,线程:" + Thread.currentThread());
        }

        @Override
        public void onNext(@NotNull Object o) {
            System.out.println("调用 onNext:" + o + "线程:" + Thread.currentThread());
        }

        @Override
        public void onError(@NotNull Throwable e) {
            System.out.println("调用 onError,线程:" + Thread.currentThread());
        }

        @Override
        public void onComplete() {
            System.out.println("订阅执行完成 onComplete,线程:" + Thread.currentThread());
        }
    };

    @Test
    public void testTool(){

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
            }
        })
            .subscribe(observer);
    }

运行效果:

建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]

2. subscribeOn、observerOn 进行线程调度

  • subscribeOn : observable 在哪个线程执行,如果有多个以第一个为准。
  • observerOn:Subscriber在哪个线程执行,链式操作时对后续操作起作用。

@RunWith(AndroidJUnit4.class)
public class ExampleInstrumentedTest {
    private String TAG = ExampleInstrumentedTest.class.getSimpleName();

    @Test
    public void useAppContext() {
        // Context of the app under test.
        Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
        assertEquals("com.cn.whr.iot.app.rxjavaleran", appContext.getPackageName());
    }

    Observer observer = new Observer<Object>() {
        @Override
        public void onSubscribe(@NotNull Disposable d) {
            Log.i(TAG, "建立订阅时调用,线程:" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(@NotNull Object o) {
            Log.i(TAG, "调用 onNext:" + o + ",线程:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(@NotNull Throwable e) {
            Log.i(TAG, "调用 onError,线程:" + Thread.currentThread().getName());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "订阅执行完成 onComplete,线程:" + Thread.currentThread().getName());
        }
    };


    @Test
    public void testTool(){

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            }
        })
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NotNull Object o) throws Exception {
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    }
                })
                .subscribe(observer);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

运行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
map apply,线程:main
调用 onNext:map ,线程:main
map apply,线程:main
调用 onNext:map ,线程:main

再看多加一个observeOn的效果:

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            }
        })
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NotNull Object o) throws Exception {
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(observer);

执行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
map apply,线程:main
map apply,线程:main
调用 onNext:map ,线程:RxCachedThreadScheduler-1
调用 onNext:map ,
  • Schedulers.newThread() 调度器创建新线程
  • AndroidSchedulers.mainThread() 安卓主线程
  • Schedulers.io() 调试器创建的可复用的线程池

3. doOnNext

调度onNext线程。
示例:

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            }
        })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        System.out.println("doOnNext,线程:" + Thread.currentThread().getName());
                    }
                })
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() {
                    @Override
                    public Object apply(@NotNull Object o) throws Exception {
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(observer);

运行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
doOnNext,线程:RxNewThreadScheduler-1
map apply,线程:main
doOnNext,线程:RxNewThreadScheduler-1
map apply,线程:main
调用 onNext:map ,线程:RxCachedThreadScheduler-1
调用 onNext:map ,线程:RxCachedThreadScheduler-1

四、过滤与条件操作符

1. filter

根据一定条件过滤事件。


    @Test
    public void testTool(){

        Observable.range(1,10)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception {
                        return integer<5;
                    }
                })
                .subscribe(observer);
    }

运行效果:

建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]
调用 onNext:3线程:Thread[main,5,main]
调用 onNext:4线程:Thread[main,5,main]
订阅执行完成 onComplete,线程:Thread[main,5,main]

2. all 判断所有事件是否全满足了条件

全满足就返回true,否则返回false。

    @Test
    public void testTool(){

        Observable.range(1,10)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception {
                        return integer<11; // 11 打印true, 5 打印false
                    }
                })
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        System.out.println(aBoolean);
                    }
                });
    }

3. takeWhile 发射数据到某条件时停止

    @Test
    public void testTool(){

        Observable.range(1,10)
                .takeWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception {
                        return integer<5;
                    }
                })
                .subscribe(observer);
    }
建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]
调用 onNext:3线程:Thread[main,5,main]
调用 onNext:4线程:Thread[main,5,main]
订阅执行完成 onComplete,线程:Thread[main,5,main]

4. skipWhile 跳过条件,直到满足条件再发射

相关推荐

前端入门——css 网格轨道详细介绍

上篇前端入门——cssGrid网格基础知识整体大概介绍了cssgrid的基本概念及使用方法,本文将介绍创建网格容器时会发生什么?以及在网格容器上使用行、列属性如何定位元素。在本文中,将介绍:...

Islands Architecture(孤岛架构)在携程新版首页的实践

一、项目背景2022,携程PC版首页终于迎来了首次改版,完成了用户体验与技术栈的全面升级。作为与用户连接的重要入口,旧版PC首页已经陪伴携程走过了22年,承担着重要使命的同时,也遇到了很多问题:维护/...

HTML中script标签中的那些属性

HTML中的<script>标签详解在HTML中,<script>标签用于包含或引用JavaScript代码,是前端开发中不可或缺的一部分。通过合理使用<scrip...

CSS 中各种居中你真的玩明白了么

页面布局中最常见的需求就是元素或者文字居中了,但是根据场景的不同,居中也有简单到复杂各种不同的实现方式,本篇就带大家一起了解下,各种场景下,该如何使用CSS实现居中前言页面布局中最常见的需求就是元...

CSS样式更改——列表、表格和轮廓

上篇文章主要介绍了CSS样式更改篇中的字体设置Font&边框Border设置,这篇文章分享列表、表格和轮廓,一起来看看吧。1.列表List1).列表的类型<ulstyle='list-...

一文吃透 CSS Flex 布局

原文链接:一文吃透CSSFlex布局教学游戏这里有两个小游戏,可用来练习flex布局。塔防游戏送小青蛙回家Flexbox概述Flexbox布局也叫Flex布局,弹性盒子布局。它决定了...

css实现多行文本的展开收起

背景在我们写需求时可能会遇到类似于这样的多行文本展开与收起的场景:那么,如何通过纯css实现这样的效果呢?实现的难点(1)位于多行文本右下角的展开收起按钮。(2)展开和收起两种状态的切换。(3)文本...

css 垂直居中的几种实现方式

前言设计是带有主观色彩的,同样网页设计中的css一样让人摸不头脑。网上列举的实现方式一大把,或许在这里你都看到过,但既然来到这里我希望这篇能让你看有所收获,毕竟这也是前端面试的基础。实现方式备注:...

WordPress固定链接设置

WordPress设置里的最后一项就是固定链接设置,固定链接设置是决定WordPress文章及静态页面URL的重要步骤,从站点的SEO角度来讲也是。固定链接设置决定网站URL,当页面数少的时候,可以一...

面试发愁!吃透 20 道 CSS 核心题,大厂 Offer 轻松拿

前端小伙伴们,是不是一想到面试里的CSS布局题就发愁?写代码时布局总是对不齐,面试官追问兼容性就卡壳,想跳槽却总被“多列等高”“响应式布局”这些问题难住——别担心!从今天起,咱们每天拆解一...

3种CSS清除浮动的方法

今天这篇文章给大家介绍3种CSS清除浮动的方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。首先,这里就不讲为什么我们要清楚浮动,反正不清除浮动事多多。下面我就讲3种常用清除浮动的...

2025 年 CSS 终于要支持强大的自定义函数了?

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发!1.什么是CSS自定义属性CSS自...

css3属性(transform)的一个css3动画小应用

闲言碎语不多讲,咱们说说css3的transform属性:先上效果:效果说明:当鼠标移到a标签的时候,从右上角滑出二维码。实现方法:HTML代码如下:需要说明的一点是,a链接的跳转需要用javasc...

CSS基础知识(七)CSS背景

一、CSS背景属性1.背景颜色(background-color)属性值:transparent(透明的)或color(颜色)2.背景图片(background-image)属性值:none(没有)...

CSS 水平居中方式二

<divid="parent"><!--定义子级元素--><divid="child">居中布局</div>...

取消回复欢迎 发表评论: