RxJava:スレッド切り替え
11881 ワード
下一篇:RxJava:基本購読プロセス
Rxjavaで最も一般的な2つの方法: subscribeOn(Scheduler scheduler) observeOn(Scheduler scheduler)
通常、我々は
so,Rxjavaはどのようにスレッド切替を行いますか?
Observable.subscribeOn(Scheduler scheduler)
subscribeOn()メソッドは、Observable.create()と同様に、Observableオブジェクトが返され、その内部には主に
ObservableSubscribeOnクラス継承関係は次のとおりです.
ObservableSubscribeOnクラスもObservableであり、名前からも分かるように、SubscribeOnが行われたObservableである.では、彼と私たちが前に述べたObservableCreate類の最大の違いはどこですか.
Observable実装クラスのキーメソッドを貼り付けます: ObservableCreate内部にエミッタクラスが定義されている---CreateEmitter,observer.onSubscribe()メソッドはCreateEmitterエミッタオブジェクトに渡される. ObservableSubscribeOn内部にはSubscribeOnObserverオブジェクトが定義されており、subscribeActualメソッドでは、s.onSubscribe()がこのSubscribeOnObserverオブジェクトに渡される.つまり、Observerを再包装し、Runnableに入れて実行します.
SubscribeOnObserverもObserverで、私たちが伝えたObserveableを包装しました
送信機によって送信されたデータは、subscribe()によって呼び出され、subscribe()が実際に呼び出されたのは、Observable実装クラスObservableSubscribeOn.subscribeActual()である.
主に重要な論理ステップ:scheduler.scheduleDirect()メソッドを見て、Runnableが指定したスレッドでどのように実行されているかを理解します.
内部的にWorker.schedule()メソッドを呼び出して、私たちが入力したRunnableを実際に実行していることがわかります.
私たちは
scheduleActualメソッドでは、Runnableを一連のパッケージ化し、最終的にはAndroidのExecutorService実装クラスである-S cheduledExecutorServiceを使用して実行します.
このexecutorは次のように構成されています.
スレッドプールからExecutorServiceを取得し、このExecutorServiceを使用して、私たちが転送したRunnableを実行します.以上をまとめると、
Observable. observeOn(Scheduler scheduler)
observeOn()はfinalメソッドであるため、すべてのObserveableは呼び出されたメソッドであり、実際にはObservableObserveOnオブジェクトインスタンスを返します.スレッドを切り替えた後にsubscribe()メソッドを使用してサブスクリプションオブザーバを定義してui操作を行うが、前編ではObserveable.subscribeが実際に実行しているのはObservable実装クラスのsubscribeActualメソッドであるため、注目すべきは
ここで注目するのはelseにおけるスレッド切替プロセスであり,subscribeOn()メソッドによるスレッド切替と同様に,入力されたSchedulerによってWorkerが決定されることである.ここではObserveOnObserverクラスを導入し,受信したobserverをカプセル化し,すなわちobserverの実行スレッドを決定する.
ObserveOnObserverクラスはRunnableインタフェースを実現し、runメソッドは以下の通りである.
ObserveOnObserverのworkerは主に
So,observeOn法もSchedulerによりスレッド切替を実現している.
schedle()メソッドでは,handlerを介して受信したRunnableをプライマリスレッドに送信して実行することを示した.
Rxjavaで最も一般的な2つの方法:
通常、我々は
subscribeOn(Schedulers.io())
メソッドを使用してIOスレッドでの購読を指定---データ処理を行い、observeOn(AndroidSchedulers.mainThread())
メソッドはメインスレッドでの観察を指定---UI操作を行う.so,Rxjavaはどのようにスレッド切替を行いますか?
Observable.subscribeOn(Scheduler scheduler)
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));//1
}
subscribeOn()メソッドは、Observable.create()と同様に、Observableオブジェクトが返され、その内部には主に
ObservableSubscribeOn(ObservableSource source, Scheduler scheduler)
メソッドが呼び出され、ObservableSubscribeOnオブジェクトが構築されていることがわかります.ObservableSubscribeOnクラスは長くありません.次のようにすべてのコードを貼ります.public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {//2
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer super T> s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
s.onSubscribe(parent);
//Disposable ,
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {//3
@Override
public void run() {
source.subscribe(parent);
}
}));
}
static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer super T> actual;
final AtomicReference s;
SubscribeOnObserver(Observer super T> actual) {
this.actual = actual;
this.s = new AtomicReference();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
}
ObservableSubscribeOnクラス継承関係は次のとおりです.
ObservableSubscribeOn extends AbstractObservableWithUpstream
bstractObservableWithUpstream extends Observable
ObservableSubscribeOnクラスもObservableであり、名前からも分かるように、SubscribeOnが行われたObservableである.では、彼と私たちが前に述べたObservableCreate類の最大の違いはどこですか.
Observable実装クラスのキーメソッドを貼り付けます:
ObservableCreate.subscribeActual(Observer super T> observer)
、比較します: protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
SubscribeOnObserverもObserverで、私たちが伝えたObserveableを包装しました
ObservableCreate.subscribeActual
で、実行したsource.subscribe(parent)
送信機を使用して我々が定義したsubscribe(ObservableEmitter e)
メソッドを実行する.ObservableSubscribeOn.subscribeActual
で、呼び出されたのはscheduler.scheduleDirect(Ruunable run)
で、Runnableが1つ入ってきて、実行source.subscribe(parent)
.送信機によって送信されたデータは、subscribe()によって呼び出され、subscribe()が実際に呼び出されたのは、Observable実装クラスObservableSubscribeOn.subscribeActual()である.
主に重要な論理ステップ:scheduler.scheduleDirect()メソッドを見て、Runnableが指定したスレッドでどのように実行されているかを理解します.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
内部的にWorker.schedule()メソッドを呼び出して、私たちが入力したRunnableを実際に実行していることがわかります.
createWorker()
は、我々が入力したSchedulerの実装クラスによって決定される、すなわち、我々が呼び出したsubscribeOn(Scheduler scheduler)
に入力したSchedulerを具体的に実装する抽象的な方法である.上記のコードでは、この参照チェーンを注釈//1//2//3で表記しました.私たちは
Schedulers.io()
を例に、その実現を具体的に見てみましょう.まず見てみましょうSchedulers.io()
代表のScheduler: public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
......
IO = RxJavaPlugins.initIoScheduler(new Callable() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});
......
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
Schedulers.io()
は、Schedulerオブジェクトを返します.IoSchedulerクラスでは、createWorker()がEventLoopWorkerオブジェクトを返します.EventLoopWorker.scheduleメソッドは次のとおりです. public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
scheduleActualメソッドでは、Runnableを一連のパッケージ化し、最終的にはAndroidのExecutorService実装クラスである-S cheduledExecutorServiceを使用して実行します.
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable
このexecutorは次のように構成されています.
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
スレッドプールからExecutorServiceを取得し、このExecutorServiceを使用して、私たちが転送したRunnableを実行します.以上をまとめると、
Observable.observeOn(Schedulers.io())
サブスレッドを切り替える原理は、ObservableOnSubscribeでカスタマイズされたsubscribe()と、実際に読み込まれたScheduler実装クラスのWorker.schedule()メソッド指定スレッドで実行することにある.Observable. observeOn(Scheduler scheduler)
public final Observable observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
observeOn()はfinalメソッドであるため、すべてのObserveableは呼び出されたメソッドであり、実際にはObservableObserveOnオブジェクトインスタンスを返します.スレッドを切り替えた後にsubscribe()メソッドを使用してサブスクリプションオブザーバを定義してui操作を行うが、前編ではObserveable.subscribeが実際に実行しているのはObservable実装クラスのsubscribeActualメソッドであるため、注目すべきは
ObservbleObserveOn.subscribeActual()
である.ObservableObserveOn.subscribeActual()
コードは以下の通り:protected void subscribeActual(Observer super T> observer) {
// scheduler , , observer subscribe() , //source ObservableSource(Observable)
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
ここで注目するのはelseにおけるスレッド切替プロセスであり,subscribeOn()メソッドによるスレッド切替と同様に,入力されたSchedulerによってWorkerが決定されることである.ここではObserveOnObserverクラスを導入し,受信したobserverをカプセル化し,すなわちobserverの実行スレッドを決定する.
ObserveOnObserverクラスはRunnableインタフェースを実現し、runメソッドは以下の通りである.
//ObserveOnObserver run ,drainFused drainNormal Observer 。
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
ObserveOnObserverのworkerは主に
schedule()
メソッドに作用し、schedule()
次のようにworker.schedule()メソッドを呼び出し、現在のObserveOnObserverを実行する.void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
So,observeOn法もSchedulerによりスレッド切替を実現している.
AndroidSchedulers.mainThread()
代表的なSchedulerは以下の通り.static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
schedle()メソッドでは,handlerを介して受信したRunnableをプライマリスレッドに送信して実行することを示した.
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
// Runnable handler ScheduledRunnable, Message Callback
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// Handler MainLooper
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}