RxJava(九)並列プログラミング
19459 ワード
RxJavaの並列プログラミング
RxJavaが送信するデータストリームは,様々な変換を経てスレッドを切り替えたが,並列効果は生じなかった.並列:複数のプロセッサまたはマルチコアプロセッサが同時に複数のタスクを処理します.同時:1つのプロセッサで複数のタスクを同時に処理します.1.Java 8パラレルストリームセットでparallelStreamを呼び出す
@TargetApi(24)
private void parallelStream(){
mList.parallelStream()
.map(new java.util.function.Function<String, String>() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
}).forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String s) {
}
});
}
2.RxJavaはFlatMapによってFlatMapを実現する:このObservableを複数の元のデータを送信するObservablesに変換し、送信したすべてのデータを収集し、順序を保証せずに統一的に送信する.カスタムスケジューラを使用する場合は、doFinallyでスケジューラを閉じる必要があります.
3.Round-Robinを使用して負荷等化groupBy+flatMap負荷等化を実現:ユーザーからの要求を順番にサーバーに送信して処理する.ここでは、データをグループ化し、一緒に処理を送信することで、Observableの作成を削減し、システムリソースを節約できます.
Observable.range(1,10)
.groupBy(new Function<Integer, String>() { // : ,
@Override
public String apply(Integer data) {
// , :key1, :key2
return data % 2 == 0 ? "key1" : "key2";
}
}).flatMap(new Function<GroupedObservable<String, Integer>, ObservableSource<String>>() {
// , String Observable
@Override
public ObservableSource<String> apply(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
// , String
return stringIntegerGroupedObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer groupData) throws Exception {
return groupData.toString();
}
});
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
4.RxJavaのParallelFlowableインプリメンテーションは、対応するFlowable作成オペレータの後にpaallel()を呼び出し、ParallelFlowableを得ることができます.
private void parallelFlowable(){
ParallelFlowable parallelFlowable1 = Flowable.range(1,10).parallel();
parallelFlowable1.runOn(Schedulers.io())
.map(new Function() {
@Override
public Object apply(Object o) throws Exception {
return null;
}
}).sequential()
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
}
});
}
サポートされているオペレータ
// CPU , parallelism, prefetch
ParallelFlowable parallelFlowable1 = Flowable.range(1,10).parallel(int parallelism,int prefetch);
parallelFlowable1.as(ParallelFlowableConverter<T,R>);
parallelFlowable1.collect(Callable, BiConsumer);
parallelFlowable1.compose(ParallelTransformer);
parallelFlowable1.concatMap(Function<T,R>,int prefetch);
parallelFlowable1.concatMapDelayError(xxx);
parallelFlowable1.doXX ;
parallelFlowable1.filter(Predicate);
parallelFlowable1.flatMap(Function<T,R>);
parallelFlowable1.map(Function<T,R>);
parallelFlowable1.parallelism();
//
parallelFlowable1.runOn(Schedulers);
parallelFlowable1.reduce(BiFunction);
parallelFlowable1.reduce(Callable initialSupplier,BiFunction);
//
parallelFlowable1.sequential();
parallelFlowable1.sequentialDelayError();
parallelFlowable1.sorted(Comparator);
parallelFlowable1.toSortedList(Comparator);
parallelFlowable1.wait(long time);