[RxJava]同時flatMap()VS parallel()
5160 ワード
前に書いたマルチスレッドダウンロードファイルのライブラリをRxJavaで再実現したいと思い、RxJavaでどのように同時実現するかを見ました.
まず、RxJavaのsubscribeOnとobserveOnの2つのOperatorはスレッド切替を実現するためにしか使用できません.データストリーム全体と操作はシリアルで、同時的な意味はありません.
実現方法はタイトルの中の2つのOperatorです.
flatMapオペレータは、-map:upstreamからのイベントごとにマッピング処理を行い、データストリームsub-Streamを生成します.したがって、このステップが終了すると、sub-Streamというデータストリームが存在します.-flat:upstream(mapのステップの結果)からのイベント(すなわちsub-Stream)ごとに購読し、あるsub-Streamがイベントを送信するとすぐに下流に転送します.そしてこの中のflatのこの一歩は気にしないで、人はよくやった.mapというステップ自体はユーザ自身が定義しており,subscribeOnオペレータによりsub-Streamを生成する操作がどのスレッド上で実行されるかを制御することで,同時効果を実現できる.
parallelオペレータはRxJava 2.0.5で導入されており、flatMapよりも使いやすさ、読み取りやすさに大きなメリットがあります.直接コード:
簡単でしょう.parallelからsequentialまでの操作はrunOnで指定されたスレッド上で同時実行されます.
parallel()はFlowableをParallelFlowableに変換し、ParallelFlowableのAPIはFlowableのAPIの小さなサブセットである.その原理はflatMapで実現したのと似ていますが、パッケージしてくれただけです.
参照先:https://dzone.com/articles/rxjava-idiomatic-concurrency-flatmap-vs-parallel
まず、RxJavaのsubscribeOnとobserveOnの2つのOperatorはスレッド切替を実現するためにしか使用できません.データストリーム全体と操作はシリアルで、同時的な意味はありません.
実現方法はタイトルの中の2つのOperatorです.
flatMap()
flatMapオペレータは、-map:upstreamからのイベントごとにマッピング処理を行い、データストリームsub-Streamを生成します.したがって、このステップが終了すると、sub-Streamというデータストリームが存在します.-flat:upstream(mapのステップの結果)からのイベント(すなわちsub-Stream)ごとに購読し、あるsub-Streamがイベントを送信するとすぐに下流に転送します.そしてこの中のflatのこの一歩は気にしないで、人はよくやった.mapというステップ自体はユーザ自身が定義しており,subscribeOnオペレータによりsub-Streamを生成する操作がどのスレッド上で実行されるかを制御することで,同時効果を実現できる.
Flowable.just(1,2)
.flatMap(it-> Flowable.just(it)
.subscribeOn(Schedulers.computation())// sub-stream
.map(i->{
System.out.println(i+" thread: "+ Thread.currentThread());
return i;
}))
.subscribe(it->{
System.out.println("onNext:"+it+" thread: "+ Thread.currentThread());
});
//out
2 thread: Thread[RxComputationThreadPool-2,5,main]
1 thread: Thread[RxComputationThreadPool-1,5,main]
onNext:2 thread: Thread[RxComputationThreadPool-2,5,main]
onNext:1 thread: Thread[RxComputationThreadPool-1,5,main]
parallel()
parallelオペレータはRxJava 2.0.5で導入されており、flatMapよりも使いやすさ、読み取りやすさに大きなメリットがあります.直接コード:
Flowable.just(1,2)
.parallel()
.runOn(Schedulers.io())//
.map(it->{
System.out.println(it+" thread: "+ Thread.currentThread());
return it;
})
.sequential()
.subscribe(it->{
System.out.println("onNext:"+it+" thread: "+ Thread.currentThread());
});
//out
1 thread: Thread[RxCachedThreadScheduler-1,5,main]
2 thread: Thread[RxCachedThreadScheduler-2,5,main]
onNext:1 thread: Thread[RxCachedThreadScheduler-1,5,main]
onNext:2 thread: Thread[RxCachedThreadScheduler-1,5,main]
簡単でしょう.parallelからsequentialまでの操作はrunOnで指定されたスレッド上で同時実行されます.
parallel()はFlowableをParallelFlowableに変換し、ParallelFlowableのAPIはFlowableのAPIの小さなサブセットである.その原理はflatMapで実現したのと似ていますが、パッケージしてくれただけです.
参照先:https://dzone.com/articles/rxjava-idiomatic-concurrency-flatmap-vs-parallel