RxJava同時スレッドスケジューリング
Rxターゲットは非同期システムで使用され、Rxはマルチスレッド処理をサポートするため、多くのRx開発者はデフォルトでRxがマルチスレッドであると考えている.実際にはそうではありませんが、Rxのデフォルトは単一スレッドです.
スレッドを明確に指定しない限り、onNext/onError/onCompletedおよび各操作関数の呼び出しはすべて同じスレッドで完了します.たとえば、次の例があります.
結果:
上の3つのスレッドでsubjectのonNext関数をそれぞれ呼び出します.Runnableのスレッドと同じスレッドです.複数の操作関数を直列に呼び出すことなく、結果は同じスレッドになります.
subscribeOnおよびobserveOnは、subscriptionの呼び出しスレッドおよびイベント通知(ObserverのonNext/onError/onCompleted関数)を受信するスレッドをそれぞれ制御するために使用される.
Rxではスレッドと直接付き合うのではなく、Schedulerでマルチスレッドを処理します.
subscribeOnはObservableを指定するために使用する.createのコードはそのSchedulerで実行されます.create関数を呼び出さなくても、内部にはcreate実装があります.例:
結果:
上記のコードは同じスレッドで実行され、シーケンスで実行されていることがわかります.subscribeの実行が完了すると(create関数のLambda式のコードを含む)後のコードが実行されます.
上の注釈を消したコードをsubscribeOn(Schedulers.newThread()をオンにします.この結果は次のようになります.
これでLambda式のコードはSchedulersになります.newThread()が返すスレッドで実行します.subscribeはもうブロックではありません.次のコードは、subscribeの戻りを待つことなく、すぐに実行できます.
一部のObservable内部では、独自に作成したスレッドが使用されます.例えばObservable.intervalは非同期です.この場合、新しいスレッドを指定する必要はありません.
結果:
ObserveOnは、データストリームの他端を制御します.あなたのobserverはどのようにイベントを受け取りますか.すなわち,そのスレッドでobserverのonNext/onError/onCompleted関数をコールバックする.
結果:
observeOnは、この関数を呼び出した後の操作関数にのみ影響します.observeOnはデータストリームをブロックし、後続の操作に役立つと考えられます.例:
結果:
observeOnに遭遇する前に、すべての操作が1つのスレッドで発生し、その後別のスレッドで発生することがわかります.これにより、Rxデータストリーム内の異なる場所に異なるスレッドを設定することができる.
データ・ストリームの処理に時間がかかることを知っている場合は、この操作により、生産者スレッドのブロックを回避できます.例えばAndroid開発中のUIスレッドでは、そのスレッドでファイルを読み取るとUIカード死(ANR)に応答しない可能性があり、この関数で別のスレッドで実行するように指定できます.
一部のObservableは、Observableが完了すると、これらのリソースを解放するリソースに依存します.リソースを解放するのに時間がかかる場合は、unsubscribeOnでリソースコードを解放して実行するスレッドを指定できます.
結果:
observeOnとsubscribeOnのパラメータはSchedulerオブジェクトです.Schedulerは、タスクの実行を調整するために使用されます.RxJavaには一般的なSchedulerが含まれており、Schedulerをカスタマイズすることもできます.Schedulersのファクトリ関数を呼び出すことで標準の事前定義されたSchedulerを取得する.
RxJavaに内蔵されているSchedulerには、次のものがあります. immediate同期実行 trampolineタスクを現在のスレッドのキューに配置し、現在のタスクの実行が完了したら、キュー内のタスク を実行し続けます. newThreadは、タスクごとに を実行する新しいスレッドを作成する. computation計算スレッド、大量のCPU計算を必要とするタスク 用 io ioオペレーションを実行するためのタスク testテストおよびデバッグ用 現在のcomputationとioの実装は似ています.彼ら2つは主に呼び出しのシーンを確保するために使用されています.ドキュメントの説明に相当し、現在のタスクがどのようなタイプであるかを示します.
ほとんどのRx操作関数の内部にはschedulersが使用されている.また,Observable操作関数の大部分にもSchedulerパラメータを用いたリロード関数がある.この操作関数が実行するスレッドは、リロード関数によって指定できます.
Rx schedulerの使用シーンはRxに限定されず、通常のJavaコードでも使用可能です.
Schedulerには、実行可能なタスク(Scheduler.Worker)を作成するcreateWorker関数があります.次に、タスクをスケジュールできます.
上記のタスクは、指定したスレッドに割り当てられます.
タスクを繰り返したり、1回だけ実行したり、タスクの実行を遅らせたりすることもできます.
結果:
上記の例では、実行を遅らせるのはスケジューリング開始時から時間を計算することを示しています.
Scheduler.WorkerはSubscriptionに継承されます.unsubscribe関数を呼び出すと、キュー内のタスクをキャンセルできます.
結果:
最初のタスクでunsubscribeが呼び出され、2番目のタスクがキャンセルされます.次の例では、タスクが完了せずにキャンセルされた場合、InterruptedException例外が放出されます.
結果:
scheduleはSubscriptionオブジェクトを返します.このオブジェクトでキャンセル操作を呼び出すことができます.これにより、すべてのタスクをキャンセルするのではなく、このタスクのみをキャンセルすることができます.
ImmediateSchedulerはスレッドスケジューリングをしていません.同期した実行タスクのみです.ネストされた呼び出しにより、タスクが再帰的に実行されます.
結果:
TrampolineSchedulerも同期して実行されますが、タスクはネストされません.後のタスクをタスクキューに追加し、前のタスクが完了してから後のタスクを実行します.
結果:
TrampolineSchedulerは、タスクを最初のタスクを実行するスレッドに配置して実行します.これにより、キューが実行されるまでscheduleを最初に呼び出す操作はブロックされます.後続のタスクは、このスレッドで1つずつ実行され、後続の呼び出しがブロックされません.
NewThreadSchedulerは、各タスクに新しいスレッドを作成します.スレッド情報を印刷する補助関数を定義します.
例:
結果:
本文は雲が千峰にあるからだ.http://blog.chengyunfeng.com/?p=978
スレッドを明確に指定しない限り、onNext/onError/onCompletedおよび各操作関数の呼び出しはすべて同じスレッドで完了します.たとえば、次の例があります.
final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
synchronized(i) {
System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
subject.onNext(i[0]++);
}
};
r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();
結果:
onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12
上の3つのスレッドでsubjectのonNext関数をそれぞれ呼び出します.Runnableのスレッドと同じスレッドです.複数の操作関数を直列に呼び出すことなく、結果は同じスレッドになります.
subscribeOnとobserveOn
subscribeOnおよびobserveOnは、subscriptionの呼び出しスレッドおよびイベント通知(ObserverのonNext/onError/onCompleted関数)を受信するスレッドをそれぞれ制御するために使用される.
public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)
Rxではスレッドと直接付き合うのではなく、Schedulerでマルチスレッドを処理します.
subscribeOn
subscribeOnはObservableを指定するために使用する.createのコードはそのSchedulerで実行されます.create関数を呼び出さなくても、内部にはcreate実装があります.例:
System.out.println("Main: " + Thread.currentThread().getId());
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
//.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());
結果:
Main: 1
Created on 1
Received 1 on 1
Received 2 on 1
Finished main: 1
上記のコードは同じスレッドで実行され、シーケンスで実行されていることがわかります.subscribeの実行が完了すると(create関数のLambda式のコードを含む)後のコードが実行されます.
上の注釈を消したコードをsubscribeOn(Schedulers.newThread()をオンにします.この結果は次のようになります.
Main: 1
Finished main: 1
Created on 11
Received 1 on 11
Received 2 on 11
これでLambda式のコードはSchedulersになります.newThread()が返すスレッドで実行します.subscribeはもうブロックではありません.次のコードは、subscribeの戻りを待つことなく、すぐに実行できます.
一部のObservable内部では、独自に作成したスレッドが使用されます.例えばObservable.intervalは非同期です.この場合、新しいスレッドを指定する必要はありません.
System.out.println("Main: " + Thread.currentThread().getId());
Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());
結果:
Main: 1
Finished main: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11
observeOn
ObserveOnは、データストリームの他端を制御します.あなたのobserverはどのようにイベントを受け取りますか.すなわち,そのスレッドでobserverのonNext/onError/onCompleted関数をコールバックする.
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.observeOn(Schedulers.newThread())
.subscribe(i ->
System.out.println("Received " + i + " on " + Thread.currentThread().getId()));
結果:
Created on 1
Received 1 on 13
Received 2 on 13
observeOnは、この関数を呼び出した後の操作関数にのみ影響します.observeOnはデータストリームをブロックし、後続の操作に役立つと考えられます.例:
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.doOnNext(i ->
System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
.observeOn(Schedulers.newThread())
.doOnNext(i ->
System.out.println("After " + i + " on " + Thread.currentThread().getId()))
.subscribe();
結果:
Created on 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13
observeOnに遭遇する前に、すべての操作が1つのスレッドで発生し、その後別のスレッドで発生することがわかります.これにより、Rxデータストリーム内の異なる場所に異なるスレッドを設定することができる.
データ・ストリームの処理に時間がかかることを知っている場合は、この操作により、生産者スレッドのブロックを回避できます.例えばAndroid開発中のUIスレッドでは、そのスレッドでファイルを読み取るとUIカード死(ANR)に応答しない可能性があり、この関数で別のスレッドで実行するように指定できます.
unsubscribeOn
一部のObservableは、Observableが完了すると、これらのリソースを解放するリソースに依存します.リソースを解放するのに時間がかかる場合は、unsubscribeOnでリソースコードを解放して実行するスレッドを指定できます.
Observable<Object> source = Observable.using(
() -> {
System.out.println("Subscribed on " + Thread.currentThread().getId());
return Arrays.asList(1,2);
},
(ints) -> {
System.out.println("Producing on " + Thread.currentThread().getId());
return Observable.from(ints);
},
(ints) -> {
System.out.println("Unubscribed on " + Thread.currentThread().getId());
}
);
source
.unsubscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
結果:
Subscribed on 1
Producing on 1
1
2
Unubscribed on 11
Schedulers
observeOnとsubscribeOnのパラメータはSchedulerオブジェクトです.Schedulerは、タスクの実行を調整するために使用されます.RxJavaには一般的なSchedulerが含まれており、Schedulerをカスタマイズすることもできます.Schedulersのファクトリ関数を呼び出すことで標準の事前定義されたSchedulerを取得する.
RxJavaに内蔵されているSchedulerには、次のものがあります.
ほとんどのRx操作関数の内部にはschedulersが使用されている.また,Observable操作関数の大部分にもSchedulerパラメータを用いたリロード関数がある.この操作関数が実行するスレッドは、リロード関数によって指定できます.
schedulerの高度な特性
Rx schedulerの使用シーンはRxに限定されず、通常のJavaコードでも使用可能です.
タスクの実行
Schedulerには、実行可能なタスク(Scheduler.Worker)を作成するcreateWorker関数があります.次に、タスクをスケジュールできます.
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println("Action"));
上記のタスクは、指定したスレッドに割り当てられます.
タスクを繰り返したり、1回だけ実行したり、タスクの実行を遅らせたりすることもできます.
Subscription schedule(
Action0 action,
long delayTime,
java.util.concurrent.TimeUnit unit)
Subscription schedulePeriodically(
Action0 action,
long initialDelay,
long period,
java.util.concurrent.TimeUnit unit)
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
結果:
5033
5035
上記の例では、実行を遅らせるのはスケジューリング開始時から時間を計算することを示しています.
タスクのキャンセル
Scheduler.WorkerはSubscriptionに継承されます.unsubscribe関数を呼び出すと、キュー内のタスクをキャンセルできます.
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> {
System.out.println(System.currentTimeMillis()-start);
worker.unsubscribe();
},
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
結果:
5032
最初のタスクでunsubscribeが呼び出され、2番目のタスクがキャンセルされます.次の例では、タスクが完了せずにキャンセルされた場合、InterruptedException例外が放出されます.
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> { try { Thread.sleep(2000); System.out.println("Action completed"); } catch (InterruptedException e) { System.out.println("Action interrupted"); } }); Thread.sleep(500); worker.unsubscribe();
結果:
Action interrupted
scheduleはSubscriptionオブジェクトを返します.このオブジェクトでキャンセル操作を呼び出すことができます.これにより、すべてのタスクをキャンセルするのではなく、このタスクのみをキャンセルすることができます.
RxJavaに既存のscheduler
ImmediateScheduler
ImmediateSchedulerはスレッドスケジューリングをしていません.同期した実行タスクのみです.ネストされた呼び出しにより、タスクが再帰的に実行されます.
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});
結果:
Start Inner End
TrampolineScheduler
TrampolineSchedulerも同期して実行されますが、タスクはネストされません.後のタスクをタスクキューに追加し、前のタスクが完了してから後のタスクを実行します.
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});
結果:
Start End Inner
TrampolineSchedulerは、タスクを最初のタスクを実行するスレッドに配置して実行します.これにより、キューが実行されるまでscheduleを最初に呼び出す操作はブロックされます.後続のタスクは、このスレッドで1つずつ実行され、後続の呼び出しがブロックされません.
NewThreadScheduler
NewThreadSchedulerは、各タスクに新しいスレッドを作成します.スレッド情報を印刷する補助関数を定義します.
public static void printThread(String message) {
System.out.println(message + " on " + Thread.currentThread().getId());
}
例:
printThread("Main");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> { printThread("Start"); worker.schedule(() -> printThread("Inner")); printThread("End"); }); Thread.sleep(500); worker.schedule(() -> printThread("Again"));
結果:
Main on 1
Start on 11
End on 11
Inner on 11
Again on 11
本文は雲が千峰にあるからだ.http://blog.chengyunfeng.com/?p=978