RX JAVA - Flowable/Observable
4540 ワード
RxJavaにはBackpressureの概念とそのFlowableクラスが存在する.
Backpressureが何なのか、Flowableがどのように書いているのか見てみましょう.
背圧とは、データの生産と消費のバランスがとれていない場合に発生する現象を指す.10000個のデータが0.1秒ごとに発行され、10秒ごとに消費されると、データはいくら消費されてもストリームに蓄積されます.Observerの消費速度はObserverがデータを発行する速度に追いつかない.これにより、メモリオーバーフローとOutOfMemoryErrorが発生し、アプリケーションがクラッシュします.この現象をバックプレッシャーと呼び,RxJavaはバックプレッシャー現象を制御する方法を提供する.
既存の観測性は背圧現象を制御することができず、Flowableは自分で背圧現象を制御することができる.次の2つのコードを見てみましょう.
Observable
Observerableを使用する場合、データの公開と消費はバランスがとれず、消費にかかわらずデータはストリームに蓄積され続けます.
逆にFlowableを使用すると、データが一定量蓄積された後にデータがパブリッシュされないことを確認できます.
同様に、FlowableはObserverableの別の形式であり、ストリームに蓄積されるデータ量を制御することができる.
Observableを使用する必要がある場合
-データ・ストリームが1000個未満の場合
-少ないデータ・ソースのみでOutOfMemoryExceptionは発生しません.
-マウスイベントやタッチイベントなどのGUIプログラミングを使用する場合(毎秒1000回以下のBentをObserverableのsample()またはdebonse()に制御することができる)
-同期プログラミングが必要ですが、プラットフォームではJava Streamsはサポートされていません.
Flowableを使用する必要がある場合
-1000以上のデータ・ストリーム.
-ディスクからファイルを読み込みます(デフォルトでは、ロック/プッシュベース)
-データベースをJDBCから読み込みます(デフォルトでは、ロック/プッシュベース)
-ネットワークIOの実行時
-ブロック/プッシュに基づく方法を使用して、ブロックされていないアクティブなAPI/ドライバからデータを取得する必要がある場合は、次の操作を行います.
OOME : Out Of Memory Exception(OOME)
create()演算子でFlowableを作成する場合は、バックプレッシャーポリシーを明確にする必要があります.
BackpressureStrategy.BUFFER戦略を実施する.パラメータは、バッファの容量、バッファオーバーフローが発生したときの動作などを同時に伝達することができる.
次の図は、要求されたデータをバッファに配置し、要求時間を考慮せずに1つずつ処理します.
最初に受信したのは1,2,3,4,5の赤色データであり,時間が経つにつれて逐一処理されている.
BackpressureStrategy.DROPポリシーを使用します.パラメータとしてデータを破棄する場合の動作を定義できます.
データ消費速度が発行速度に追いつかないと、発行されたデータは破棄される.
最初のリクエストが来たとき,2,3,4,5の赤色データを破棄し,新しいリクエストの黄色1号を処理する.
BackpressureStrategy.LATEST戦略を採用.
要求された時点で、最近要求されたデータを除いて、すべて破棄される.
Backpressureが何なのか、Flowableがどのように書いているのか見てみましょう.
背圧
背圧とは、データの生産と消費のバランスがとれていない場合に発生する現象を指す.10000個のデータが0.1秒ごとに発行され、10秒ごとに消費されると、データはいくら消費されてもストリームに蓄積されます.Observerの消費速度はObserverがデータを発行する速度に追いつかない.これにより、メモリオーバーフローとOutOfMemoryErrorが発生し、アプリケーションがクラッシュします.この現象をバックプレッシャーと呼び,RxJavaはバックプレッシャー現象を制御する方法を提供する.
Flowable
既存の観測性は背圧現象を制御することができず、Flowableは自分で背圧現象を制御することができる.次の2つのコードを見てみましょう.
Observable
Observable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
FlowableFlowable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
両方の例では、消費遅延が100ミリ秒の10000個のデータが公開されています.Observerableを使用する場合、データの公開と消費はバランスがとれず、消費にかかわらずデータはストリームに蓄積され続けます.
逆にFlowableを使用すると、データが一定量蓄積された後にデータがパブリッシュされないことを確認できます.
同様に、FlowableはObserverableの別の形式であり、ストリームに蓄積されるデータ量を制御することができる.
When to use Observable? When to use Flowable?
Observableを使用する必要がある場合
-データ・ストリームが1000個未満の場合
-少ないデータ・ソースのみでOutOfMemoryExceptionは発生しません.
-マウスイベントやタッチイベントなどのGUIプログラミングを使用する場合(毎秒1000回以下のBentをObserverableのsample()またはdebonse()に制御することができる)
-同期プログラミングが必要ですが、プラットフォームではJava Streamsはサポートされていません.
Flowableを使用する必要がある場合
-1000以上のデータ・ストリーム.
-ディスクからファイルを読み込みます(デフォルトでは、ロック/プッシュベース)
-データベースをJDBCから読み込みます(デフォルトでは、ロック/プッシュベース)
-ネットワークIOの実行時
-ブロック/プッシュに基づく方法を使用して、ブロックされていないアクティブなAPI/ドライバからデータを取得する必要がある場合は、次の操作を行います.
OOME : Out Of Memory Exception(OOME)
例
create()演算子でFlowableを作成する場合は、バックプレッシャーポリシーを明確にする必要があります.
Flowable.create(emitter -> {
for (int i = 0; i < 10000; i++) emitter.onNext(i);
emitter.onComplete();
}, BackpressureStrategy.DROP)
.observeOn(Schedulers.io())
.subscribe();
onBackPressureBuffer()
BackpressureStrategy.BUFFER戦略を実施する.パラメータは、バッファの容量、バッファオーバーフローが発生したときの動作などを同時に伝達することができる.
次の図は、要求されたデータをバッファに配置し、要求時間を考慮せずに1つずつ処理します.
最初に受信したのは1,2,3,4,5の赤色データであり,時間が経つにつれて逐一処理されている.
onBackPressureDrop()
BackpressureStrategy.DROPポリシーを使用します.パラメータとしてデータを破棄する場合の動作を定義できます.
データ消費速度が発行速度に追いつかないと、発行されたデータは破棄される.
最初のリクエストが来たとき,2,3,4,5の赤色データを破棄し,新しいリクエストの黄色1号を処理する.
onBackPressureLatest()
BackpressureStrategy.LATEST戦略を採用.
要求された時点で、最近要求されたデータを除いて、すべて破棄される.
例
Flowable.range(1, 1000)
.onBackpressureLatest()
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*1000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Consume Data : 1
Emit Data : 5
...
Emit Data : 128
Consume Data : 2
Consume Data : 3
...
Consume Data : 95
Consume Data : 96
Emit Data : 1000
Consume Data : 97
Consume Data : 98
...
Consume Data : 128
Consume Data : 1000
Reference
この問題について(RX JAVA - Flowable/Observable), 我々は、より多くの情報をここで見つけました https://velog.io/@jsw4215/RX-JAVA-FlowableObservableテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol