RxJavaスレッド切替原理
RxJavaのスレッド切り替えは主に以下の通りです。
observeOn(),subscribeon()
この二つの方法はどのように切り替えられますか?
ObserveOn()は、前の構造の良いObservableの例として機能し、RxJavaは、スレッド切り替えの操作をObservableとしてObservableをObservable subscribe()方法とObserver onxt()にパッケージしてリンクを実行しています。
まずsubscribenを分析する()
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
newはObservable Subscribenオブジェクトを示し、同時に現在のノードにpreviousノードとして伝えられ、スケジューラとしてschedulerを提供する。@Override
public void subscribeActual(final Observer super T> observer) {
final SubscribeOnObserver parent = new SubscribeOnObserver(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
キーコードはsubscribeActualにあります。observerを一つのTaskにカプセル化した後、scheduler.scheduleDirectを呼び出しました。public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
scheduleDirect()したことは、入ってきたRunnableを再度パッケージ化し、workerに任せて処理します。私たちは今は具体的にどのようなスレッド池を研究していません。今はどうやって切り替えができるかだけ見ています。先ほどのスレッドスケジュールはsubscribeActualで発生しましたが、subscribe()は絶えず上に遡り、原点Observable Createに追いつき、Observable OsubScribe類のsubscribeを呼び出す方法を知っています。この中でsubscriben()が何度も実行されているなら、まだデータが吐出されていないため、subscriben()は起点に一番近いところでしか有効ではなく、後ろの何回もsubscriben()は無効です。
ObserveOn()を分析して、既存のobservableチェーンに新たなノードObservable ObserveOn()を追加します。
public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
Observable ObserveOnの中で比較的に肝心ないくつかの方法を見てください。@Override
protected void subscribeActual(Observer super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
そのsubscribeActualはスレッド切り替えを行っていないが、Scheduler.Workerを構築し、Observable Obseverオブジェクトの構造関数のパラメータにそれを伝えただけで、ObserverのOnext(実行されているスレッド)の実行に影響を及ぼすと考えられ、データフローの処理に影響を与える。私達は考えに沿って下を見ます。static final class ObserveOnObserver extends BasicIntQueueDisposable
implements Observer, Runnable {
...
ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.worker = worker;
...
}
...
@Override
public void onNext(T t) {
...
schedule();
}
@Override
public void onError(Throwable t) {
...
schedule();
}
@Override
public void onComplete() {
...
schedule();
}
...
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
案の定、そのonnext()、onComplettee()、onError()の中でschedule()の方法に呼び出されました。ObserveObserverはRunnableインターフェースを実現しましたので、直接worker.schedule()の中で現在のオブジェクトのインスタンスを伝えます。NewThreadWorkerを例にとって、scheduleになくして何をしたかを分析します。
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
...
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
Future> f;
...
if (delayTime <= 0) {
f = executor.submit((Callable
よく知っている感じがありますか?タイミングタスクはexectorスレッド池のscheduleメソッド処理に任せ、通常タスクは直接submit()になります。これにより、Schdulersの抽象的なクラスの静的な工場によって生成された特定のスレッドプールに提出された。
ObserveOn()はデータのソースから下に流れるdownstream過程に作用するので、この過程でブロックするスレッドの切り替えが有効になり、現在のタスクに作用するonNext()などの方法があります。
subscribeActual()の向上ソースとsubscribe()を了解した後、継続的にonNext()の下の責任チェーンを押すと、RxJavaのスレッド切り替えの原理が分かりやすくなります。
次の例を挙げて説明します。
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("hello");
System.out.println("Thread:"+Thread.currentThread().getName());
}
}).observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.map(new Function() {
@Override
public String apply(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
return "abc-"+s;
}
}).observeOn(Schedulers.newThread())
.map(new Function() {
@Override
public String apply(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
return s+"-def";
}
}).observeOn(Schedulers.single())
.subscribeOn(Schedulers.single())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
System.out.println("result:"+s);
}
});
この中はマルチスレッドの切り替えに関連しています。何度もobserveOnを呼び出します。subscribeonは前の分析によって、subscribeOnはobservableチェーンの一番原点からの切り替えしか有効ではないので、ここでは最初のsubscribeOnだけ有効です。データはSchdule.rsioから出るべきです。切り替えはonNext()つまり、対応するfunction()のアプリ()が実行スレッドに変わります。Functionは最後にobserverにパッケージされます。印刷結果と私達の分析が一致しているかを確認します。
Thread:RxCachedThreadScheduler-1
Thread:RxComputationThreadPool-1
Thread:RxNewThreadScheduler-1
Thread:RxSingleScheduler-1
result:abc-hello-def
ここで説明するのはSchdulers.io()に使われているThreadFactoryから入ってきたラベルは「RxCachedThreadScheduler」であるので、CachedThreadをプリントします。