Rxjava中的操作符

Author Avatar
w4ctech 3月16日
  • 在其它设备中阅读本文章

最后更新于2019年03月20日; 如遇到问题,请留言及时通知站长

map 转换

map 转换,将泛型指定的对象转换成其他类型的对象,可进行多次转换
Observable.create(new ObservableOnSubscribe<String>() {

    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("100");
        emitter.onComplete();
    }

}).map(new Function<String, Integer>() {

    @Override
    public Integer apply(String s) throws Exception {
        return Integer.valueOf(s);
    }

}).subscribe(new Consumer<Integer>() {
    
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("这里就已经将 String 转换成 Integer 了" + integer);
    }
});

这里就已经将 String 转换成 Integer了100

filter 过滤

filter 过滤,将指定的对象过滤掉,可进行多次过滤
Observable.just(1, 2, 3, 4, 5, 7, 8)
.filter(new Predicate<Integer>() {

    @Override
    public boolean test(Integer integer) throws Exception {
        //大于5的过滤掉
        return integer < 5;
    }
})
.subscribe(new Consumer<Integer>() {

    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("这里就能获取到过滤后的整数了" + integer);
    }
});

这里就能获取到过滤后的整数了1
这里就能获取到过滤后的整数了2
这里就能获取到过滤后的整数了3
这里就能获取到过滤后的整数了4

flatMap 平铺

flatMap 平铺,一对多的转化(类似嵌套),这里需要注意的是, flatMap 并不保证事件的顺序, 如果需要保证顺序则需要使用 concatMap
Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }

}).flatMap(new Function<Integer, ObservableSource<String>>() {

    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {

        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer);
        }
        return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
    }

}).subscribe(new Consumer<String>() {

    @Override
    public void accept(String s) throws Exception {
        System.out.println("TAG:" + s);
    }
});

D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2

distinct 去重复

Observable.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("这里就能获取去重复之后的整数了:" + integer);
    }
});

这里就能获取去重复之后的整数了:1
这里就能获取去重复之后的整数了:2
这里就能获取去重复之后的整数了:3

take 取出固定个数

Observable.just(1, 2, 3, 4, 5, 6)
.take(2)
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("这里就能取出固定个数之后的整数了:" + integer);
    }
});

这里就能取出固定个数之后的整数了:1
这里就能取出固定个数之后的整数了:2
这里就能取出固定个数之后的整数了:3

toList 打包成集合

Observable.just(1, 2, 3, 4, 5, 6)
.toList()
.subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        System.out.println("这里可以直接打包成一个集合,集合的长度为" + integers.size());
    }
});

这里可以直接打包成一个集合,集合的长度为6

delay 延迟

使得被观察者延迟一段时间再发送事件
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)

// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延迟时间  & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟

Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延迟3s再发送,由于使用类似,所以此处不作全部展示
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("延迟测试" + integer);
    }
});

zip 压合

给初学者的 RxJava2.0 教程 (四)

将两个被观察者对象组合成一个,如果两个被观察者的大小不同,合并的次数将以最小那个为主

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            

    @Override                                                                                         
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                      
        Log.d(TAG, "emit 1");                                                                         
        emitter.onNext(1);                                                                            
        Log.d(TAG, "emit 2");                                                                         
        emitter.onNext(2);                                                                            
        Log.d(TAG, "emit 3");                                                                         
        emitter.onNext(3);                                                                            
        Log.d(TAG, "emit 4");                                                                         
        emitter.onNext(4);                                                                            
        Log.d(TAG, "emit complete1");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                   
                                                                                                      
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {              
    @Override                                                                                         
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                       
        Log.d(TAG, "emit A");                                                                         
        emitter.onNext("A");                                                                          
        Log.d(TAG, "emit B");                                                                         
        emitter.onNext("B");                                                                          
        Log.d(TAG, "emit C");                                                                         
        emitter.onNext("C");                                                                          
        Log.d(TAG, "emit complete2");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                     
                                                                                                      
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
             
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }).subscribe(new Observer<String>() {
                 
    @Override                                                                                         
    public void onSubscribe(Disposable d) {                                                           
        Log.d(TAG, "onSubscribe");                                                                    
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onNext(String value) {                                                                
        Log.d(TAG, "onNext: " + value);                                                               
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onError(Throwable e) {                                                                
        Log.d(TAG, "onError");                                                                        
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onComplete() {                                                                        
        Log.d(TAG, "onComplete");                                                                     
    }                                                                                                 
});

D/TAG: onSubscribe     
D/TAG: emit 1          
D/TAG: emit 2          
D/TAG: emit 3          
D/TAG: emit 4          
D/TAG: emit complete1  
D/TAG: emit A          
D/TAG: onNext: 1A      
D/TAG: emit B          
D/TAG: onNext: 2B      
D/TAG: emit C          
D/TAG: onNext: 3C      
D/TAG: emit complete2  
D/TAG: onComplete
执行的顺序是第一个被观察者全部执行完毕之后才执行第二个被观察者对象,导致这种现象是因为他们都在同一个线程中,现在进行一下线程调度
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
    @Override                                                                                      
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
        Log.d(TAG, "emit 1");                                                                      
        emitter.onNext(1);                                                                         
                                                                                                   
        Log.d(TAG, "emit 2");                                                                      
        emitter.onNext(2);                                                                         
                                                                                                   
        Log.d(TAG, "emit 3");                                                                      
        emitter.onNext(3);                                                                         
                                                                                                   
        Log.d(TAG, "emit 4");                                                                      
        emitter.onNext(4);                                                                         
                                                                                                   
        Log.d(TAG, "emit complete1");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
    @Override                                                                                      
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
        Log.d(TAG, "emit A");                                                                      
        emitter.onNext("A");                                                                       
                                                                                                   
        Log.d(TAG, "emit B");                                                                      
        emitter.onNext("B");                                                                       
                                                                                                   
        Log.d(TAG, "emit C");                                                                      
        emitter.onNext("C");                                                                       
                                                                                                   
        Log.d(TAG, "emit complete2");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
    @Override                                                                                      
    public String apply(Integer integer, String s) throws Exception {                              
        return integer + s;                                                                        
    }                                                                                              
}).subscribe(new Observer<String>() {                    
    @Override                                                                                      
    public void onSubscribe(Disposable d) {                                                        
        Log.d(TAG, "onSubscribe");                                                                 
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onNext(String value) {                                                             
        Log.d(TAG, "onNext: " + value);                                                            
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onError(Throwable e) {                                                             
        Log.d(TAG, "onError");                                                                     
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onComplete() {                                                                     
        Log.d(TAG, "onComplete");                                                                  
    }                                                                                              
});     

D/TAG: onSubscribe    
D/TAG: emit A         
D/TAG: emit 1         
D/TAG: onNext: 1A     
D/TAG: emit B         
D/TAG: emit 2         
D/TAG: onNext: 2B     
D/TAG: emit C         
D/TAG: emit 3         
D/TAG: onNext: 3C     
D/TAG: emit complete2 
D/TAG: onComplete     

本文链接:https://i.w4ctech.cn/Rxjava/Operator.html
This blog is under a CC BY-NC-SA 3.0 Unported License