part05_Rxjavaオペレータ
14771 ワード
作者:IT魔幻师ブログ:www.huyingzi.top転載は出典を明記してください:https://www.jianshu.com/p/afeba5aea533
主にオブザーバの作成に使用 just createのショートカット作成操作、createオペレータはイベントをトリガーするにはonNextを手動で呼び出す必要があります.justは を自動的にトリガーします. fromArrayはjustよりもfromArrayがマルチパラメータの場合に適用する. range一定範囲内のイベント を作成する. emptyは主に呼び出し後にパラメータを返す必要がなく、結果に関心を持つ必要があることに適用される.例えば、ネットワーク要求を開始した後、onComplete()で結果を処理すれば、onNext関数をコールバックしないことができる. intervalタイマオペレータは、Androidに依存するapiが純java環境で を使用できない必要がある. intervalRangeタイマオペレータ、Android依存apiは純java環境で を使用できません. timerはintervalと同じです.
イベントタイプを目的の結果に変換 map flatMapが前のイベントが完了するまで次のイベントを開始できない場合 . groupByは、受信イベントをグループ化し、グループ化の条件は、 を自分で指定することができる. buffer大量データの処理が必要な場合には、バッチ処理 を行う. rangeの前の結果を次のパラメータとして、すべての結果を累積する最終結果、ファイルマージ、文字列接合などのシーンが得られる.
filterがイベントをフィルタリングまたはフィルタリングしない処理 . take生成イベントの数を制限 distinct繰り返しイベント をフィルタ elementAtフィルタ指定イベント allは、すべてのイベントが1つの条件を満たすか否かを判断し、すべてが満たされればtrue である. containsは、すべてのイベントにイベント が含まれているかどうかを判断する. anyすべてのイベントにtrue という条件がある限り isEmptyは、観察者にイベントがあるかどうかを判断する . defaultIfEmptyオブジェクトがイベントを送信しない場合、デフォルトのイベント が送信されます. skipWhile条件を満たすイベント をスキップ
被観察者をマージする startWithは、必要なイベントを1つのイベントにまとめる処理し、startWithが追加するイベント を先に処理する. concatは最大4つのイベントを統合し、startWithとは逆の順に処理します. merge mergeは、複数の被観察者をマージし、マージ後、時間順に並列に を実行する. mergeDelayErrorは例外イベントの放出を遅延し、マージされた他のイベントがすべて実行された後に例外 を放出する. zipは、複数の被観察者を単一に圧縮し、最も少ない被観察者結果 を出力する.
一、作成型オペレータ
主にオブザーバの作成に使用
@Test
public void testjust() {
//just create
Observable.just(" "," 2").subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
// just
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testFromArray() {
Observable.fromArray(new String[]{" ",
" 2",
" 3",
" 4"}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testRange() {
// 5 11
Observable.range(5,11).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testempty() {
Observable.empty().subscribe(new Observer
// 1
Observable.interval(1, TimeUnit.SECONDS);
// 0 1000 50 0
Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
二、変換オペレータ
イベントタイプを目的の結果に変換
@Test
public void testMap() {
// : bitmap
Observable.just("icon01.png","icon02.png").map(new Function() {
@Override
public Bitmap apply(String url) throws Exception {
//
// ...
Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888);
return mBitmap;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
//
System.out.println(" :"+bitmap);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testFlatMap() {
// :token token
Observable.just("getToken","login").flatMap(new Function>() {
@Override
public ObservableSource> apply(String s) throws Exception {
System.out.println(" :"+s);
return createRespone(s);
}
}).subscribe(new Observer
@Test
public void testGroupBy() {
Observable.just(1,2,3,4).groupBy(new Function() {
@Override
public String apply(Integer integer) throws Exception {
return integer>2?"A ":"B ";
}
}).subscribe(new Consumer>() {
@Override
public void accept(final GroupedObservable stringIntegerGroupedObservable)
throws Exception {
//stringIntegerGroupedObservable
stringIntegerGroupedObservable.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
System.out.println("key="+key+" "+integer);
}
});
}
});
}
@Test
public void testBuffer() {
// 6 2
Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List integers) {
//
System.out.println(integers);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testScan() {
Observable.range(1,5).scan(new BiFunction() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
// integer
// , ...
return integer+integer2;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
三、フィルタオペレータ
@Test
public void testFilter() {
Observable.just(1,2,3,4,5,6).filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
//
//true
//false
return integer>2;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
//
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testTake() {
// 1 take 5
Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println(aLong+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void testDistinct() {
Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
// 5
Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
四、条件オペレータ
Observable.just(1,2,3,4,5,6).all(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
// 2
return integer>2;
}
}).subscribe(new Consumer() {
@Override
public void accept(Boolean aBoolean) throws Exception {
//
System.out.println(aBoolean);
}
});
Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer() {
@Override
public void accept(Boolean aBoolean) throws Exception {
// 3
System.out.println(aBoolean);
}
});
Observable.just(1,2,3,4,5).any(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer==3;
}
}).subscribe(new Consumer() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println(aBoolean);
}
});
Observable.just(1).isEmpty().subscribe(new Consumer() {
@Override
public void accept(Boolean aBoolean) throws Exception {
// true false
System.out.println(aBoolean);
}
});
.defaultIfEmpty(0)
// 0 1000 50 0
Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate() {
@Override
public boolean test(Long aLong) throws Exception {
// <10
return aLong<10;
}
}).subscribe(new Consumer() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
五、連結オペレータ
被観察者をマージする
// , 2,4,6,8
Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8))
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
// 123
Observable.concat(
Observable.just(1,2,3),
Observable.just(4,5,6))
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS);
Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS);
Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});
//
Flowable observable1 = Flowable.create(new FlowableOnSubscribe>() {
@Override
public void subscribe(FlowableEmitter> emitter) throws Exception {
//
emitter.onError(new NullPointerException());
}
}, BackpressureStrategy.BUFFER);
Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});