part05_Rxjavaオペレータ

14771 ワード

作者:IT魔幻师ブログ:www.huyingzi.top転載は出典を明記してください:https://www.jianshu.com/p/afeba5aea533

一、作成型オペレータ


主にオブザーバの作成に使用
  • just createのショートカット作成操作、createオペレータはイベントをトリガーするにはonNextを手動で呼び出す必要があります.justは
    @Test
     public void testjust() {
         //just create 
         Observable.just(" "," 2").subscribe(new Observer() {
             @Override
             public void onSubscribe(Disposable d) {
             }
    
             @Override
             public void onNext(String s) {
                 // just 
             }
    
             @Override
             public void onError(Throwable e) {
    
             }
    
             @Override
             public void onComplete() {
    
             }
         });
     }
    
  • を自動的にトリガーします.
  • fromArrayはjustよりもfromArrayがマルチパラメータの場合に適用する.
      @Test
      public void testFromArray() {
          Observable.fromArray(new String[]{" ",
                  " 2",
                  " 3",
                  " 4"}).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("onNext  "+s);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • range一定範囲内のイベント
    @Test
      public void testRange() {
          // 5 11 
          Observable.range(5,11).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
              @Override
              public void onError(Throwable e) {
              }
              @Override
              public void onComplete() {
              }
          });
      }
    
  • を作成する.
  • emptyは主に呼び出し後にパラメータを返す必要がなく、結果に関心を持つ必要があることに適用される.例えば、ネットワーク要求を開始した後、onComplete()で結果を処理すれば、onNext関数をコールバックしないことができる.
    
      @Test
      public void testempty() {
          Observable.empty().subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
                  System.out.println(" ");
              }
          });
      }
    
  • intervalタイマオペレータは、Androidに依存するapiが純java環境で
    // 1 
    Observable.interval(1, TimeUnit.SECONDS);
    
  • を使用できない必要がある.
  • intervalRangeタイマオペレータ、Android依存apiは純java環境で
     // 0 1000 50    0
          Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    
  • を使用できません.
  • timerはintervalと同じです.

  • 二、変換オペレータ


    イベントタイプを目的の結果に変換
  • map
     @Test
      public void testMap() {
          // : bitmap
          Observable.just("icon01.png","icon02.png").map(new Function() {
              @Override
              public Bitmap apply(String url) throws Exception {
                  // 
                  // ...   
                  Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888);
                  return mBitmap;
                  
              }
          }).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
                  
              }
    
              @Override
              public void onNext(Bitmap bitmap) {
                  //    
                  System.out.println(" :"+bitmap);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
    
  • flatMapが前のイベントが完了するまで次のイベントを開始できない場合
    @Test
      public void testFlatMap() {
          // :token   token  
          Observable.just("getToken","login").flatMap(new Function>() {
              @Override
              public ObservableSource> apply(String s) throws Exception {
                  System.out.println(" :"+s);
                  return createRespone(s);
              }
          }).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
                  // 
                  System.out.println(o);
    
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
    
      }
    
      private ObservableSource> createRespone(final String s) {
          // , 
          return Observable.create(new ObservableOnSubscribe() {
              @Override
              public void subscribe(ObservableEmitter emitter) throws Exception {
                  System.out.println(" :"+s);
                  // getToken 
                  emitter.onNext(s);
              }
          });
      }
    
  • .
  • groupByは、受信イベントをグループ化し、グループ化の条件は、
    @Test
      public void testGroupBy() {
          Observable.just(1,2,3,4).groupBy(new Function() {
              @Override
              public String apply(Integer integer) throws Exception {
                  return integer>2?"A ":"B ";
              }
          }).subscribe(new Consumer>() {
              @Override
              public void accept(final GroupedObservable stringIntegerGroupedObservable)
                      throws Exception {
                  //stringIntegerGroupedObservable  
                  stringIntegerGroupedObservable.subscribe(new Consumer() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          String key = stringIntegerGroupedObservable.getKey();
                          System.out.println("key="+key+" "+integer);
                      }
                  });
              }
          });
      }
    
  • を自分で指定することができる.
  • buffer大量データの処理が必要な場合には、バッチ処理
     @Test
      public void testBuffer() {
          // 6 2 
          Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(List integers) {
                  // 
                  System.out.println(integers);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • を行う.
  • rangeの前の結果を次のパラメータとして、すべての結果を累積する最終結果、ファイルマージ、文字列接合などのシーンが得られる.
      @Test
      public void testScan() {
          Observable.range(1,5).scan(new BiFunction() {
              @Override
              public Integer apply(Integer integer, Integer integer2) throws Exception {
                  // integer    
                  //   , ... 
                  return integer+integer2;
              }
          }).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    

  • 三、フィルタオペレータ

  • filterがイベントをフィルタリングまたはフィルタリングしない処理
    @Test
      public void testFilter() {
          Observable.just(1,2,3,4,5,6).filter(new Predicate() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  // 
                  //true  
                  //false  
                  return integer>2;
              }
          }).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  //  
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • .
  • take生成イベントの数を制限
        @Test
      public void testTake() {
          // 1  take 5 
          Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Long aLong) {
                  System.out.println(aLong+"");
                  
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • distinct繰り返しイベント
        @Test
      public void testDistinct() {
          Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • をフィルタ
  • elementAtフィルタ指定イベント
          // 5 
          Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • 四、条件オペレータ

  • allは、すべてのイベントが1つの条件を満たすか否かを判断し、すべてが満たされればtrue
     Observable.just(1,2,3,4,5,6).all(new Predicate() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  // 2 
                  return integer>2;
              }
          }).subscribe(new Consumer() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  // 
                  System.out.println(aBoolean);
              }
          });
    
  • である.
  • containsは、すべてのイベントにイベント
     Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer() {
             @Override
             public void accept(Boolean aBoolean) throws Exception {
                 // 3 
                 System.out.println(aBoolean);
             }
         });
    
  • が含まれているかどうかを判断する.
  • anyすべてのイベントにtrue
    Observable.just(1,2,3,4,5).any(new Predicate() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  return integer==3;
              }
          }).subscribe(new Consumer() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  System.out.println(aBoolean);
              }
          });
    
  • という条件がある限り
  • isEmptyは、観察者にイベントがあるかどうかを判断する
     Observable.just(1).isEmpty().subscribe(new Consumer() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  // true   false
                  System.out.println(aBoolean);
              }
          });
    
  • .
  • defaultIfEmptyオブジェクトがイベントを送信しない場合、デフォルトのイベント
    .defaultIfEmpty(0)
    
  • が送信されます.
  • skipWhile条件を満たすイベント
             // 0 1000 50    0  
            Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate() {
              @Override
              public boolean test(Long aLong) throws Exception {
                  // <10 
                  return aLong<10;
              }
          }).subscribe(new Consumer() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    
  • をスキップ

    五、連結オペレータ


    被観察者をマージする
  • startWithは、必要なイベントを1つのイベントにまとめる処理し、startWithが追加するイベント
            // , 2,4,6,8 
          Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8))
          .subscribe(new Consumer() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • を先に処理する.
  • concatは最大4つのイベントを統合し、startWithとは逆の順に処理します.
            //  123  
          Observable.concat(
                  Observable.just(1,2,3),
                  Observable.just(4,5,6))
          .subscribe(new Consumer() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • merge mergeは、複数の被観察者をマージし、マージ後、時間順に並列に
            Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS);
    
          Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • を実行する.
  • mergeDelayErrorは例外イベントの放出を遅延し、マージされた他のイベントがすべて実行された後に例外
    // 
          Flowable observable1 = Flowable.create(new FlowableOnSubscribe>() {
              @Override
              public void subscribe(FlowableEmitter> emitter) throws Exception {
                  // 
                  emitter.onError(new NullPointerException());
              }
          }, BackpressureStrategy.BUFFER);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
    
    
          Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • を放出する.
  • zipは、複数の被観察者を単一に圧縮し、最も少ない被観察者結果
  • を出力する.