RxJava 2レコード
19384 ワード
1.まず基本的な呼び出しをする
ログは次のように印刷されます.
次のことがわかります.下流観察者onSubscribe法が最初に実行される. 、次いで上流の被観察者のsubscribe法が実行される. 最後に下流観察者のonNext、onCompleteが実行される.
この過程を分析してcreateはObservableオブジェクト、つまりObservableCreateを作成しました
subscribeを呼び出すとObservableCreateのsubscribeメソッドが呼び出され、ObservableCreateはObservableを継承し、親のsubscribeメソッドを呼び出すことに相当します.Observableのsubscribeメソッドを見てみましょう
こうしてObservableCreateという実装クラスに戻り,そのsubscribeActual実装方法に重点を置いた.
上に、最も基本的な上流から下流までの中間の呼び出しプロセスを示した.
2.スレッド切替先上コードの追加
ログは次のように印刷されます.
次のことがわかります.下流観察者onSubscribe法は、subscribe時のスレッドを実行する. 上流の観察者のsubscribe法(すなわち、データを送信する際の方法)は、RxCachedThreadScheduler−1スレッドで実行される. 下流観察者のonNext,onCompleteはいずれもメインスレッドで実行される.
2.1スレッド切替の原理を分析し、まずsubscribeOnメソッドを見て、ObservableSubscribeOnを返します.
ObservableSubscribeOnを見続けます.AbsstractObservableWithUpstreamを継承しています.
ここでは大まかな流れをまとめることができます subscribeOnを呼び出すと、ObserableをObservableSubscribeOn、ObserverをSubscribeOnObserverに装飾する.最初のsubscribeで購読したスレッドでobserverをコールバックするonSubscribeメソッド; parentを呼び出す.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))はrunnable実装スレッド切替を実行し、schedulerスレッドrunメソッドでsourceを呼び出す.subscribe(parent)、ここのparent SubscribeOnObserverは装飾されたobserverで、最終的にobserverを呼び出す方法です.
SchedulerのscheduleDirectメソッド
AbstractObservableWithUpstream
AbsstractObservableWithUpstreamはObservableを継承し、上流のObservableをsourceで保存します.
2.2 observeOnメソッドを見る
ここまで,フローは
ObserveOnObserverの静的内部クラスObserveOnObserver,上流observableはsubscribeActualメソッドを実行し,ObserveOnObserverのonSubscribe,onNextなどのメソッドを呼び出す.
最終的にscheduleメソッドが呼び出され、スレッドの切り替えが実現されます.
ここでは、ObserveOnObserverがRunnableインタフェースを実現したため、runメソッドを参照してください.
drainNormalの方法を見て
まとめタスクチェーンを作成し、各ステップは対応するObservableオブジェクトを返します. は逆に段階的に購読し、各ステップは対応するObserverを生成して前のステップで生成したObservableを購読する. タスクチェーンを実行し、タスクチェーンを実行した後、各ステップは対応するObserverに通知し、調整タスクチェーンを完了する.
//
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.i(TAG, "ObservableEmitter, :" + " currentThread = " + Thread.currentThread().getName());
e.onNext(1);// Observer onNext
e.onComplete();// Observer onComplete
}
});
//
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe subscribe
// Disposable , Observer
Log.i(TAG, "onSubscribe, , : " + " currentThread = " + Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext, : " + " currentThread = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError, : " + " currentThread = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete, : " + " currentThread = " + Thread.currentThread().getName());
}
};
//
observable.subscribe(observer);
ログは次のように印刷されます.
I/RxJava2: onSubscribe, , : currentThread = main
I/RxJava2: ObservableEmitter, : currentThread = main
I/RxJava2: onNext, : currentThread = main
I/RxJava2: onComplete, : currentThread = main
次のことがわかります.
この過程を分析してcreateはObservableオブジェクト、つまりObservableCreateを作成しました
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable create(ObservableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));// , ObservableCreate
}
subscribeを呼び出すとObservableCreateのsubscribeメソッドが呼び出され、ObservableCreateはObservableを継承し、親のsubscribeメソッドを呼び出すことに相当します.Observableのsubscribeメソッドを見てみましょう
public abstract class Observable implements ObservableSource {
...
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);// subscribeActual
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer super T> observer);// ,
...
}
こうしてObservableCreateという実装クラスに戻り,そのsubscribeActual実装方法に重点を置いた.
public final class ObservableCreate extends Observable {
final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;// create ObservableOnSubscribe
}
@Override
protected void subscribeActual(Observer super T> observer) {//
// CreateEmitter, , ObservableEmitter Disposable ,
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);// onSubscribe , Observer onSubscribe
try {
source.subscribe(parent);// ObservableOnSubscribe subscribe ,
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter extends AtomicReference
implements ObservableEmitter, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer super T> observer;
CreateEmitter(Observer super T> observer) {//
this.observer = observer;
}
@Override
public void onNext(T t) {// e.onNext ,
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {//
observer.onNext(t);// observer onNext
}
}
@Override
public void onError(Throwable t) {//
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);// observer onError
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {// e.onComplete ,
if (!isDisposed()) {
try {
observer.onComplete();// observer onComplete
} finally {
dispose();
}
}
}
...
}
...
}
上に、最も基本的な上流から下流までの中間の呼び出しプロセスを示した.
2.スレッド切替先上コードの追加
observable.subscribeOn(Schedulers.io())// subscribe , ( , )
.observeOn(AndroidSchedulers.mainThread())// Observer , 。( )
.subscribe(observer);
ログは次のように印刷されます.
I/RxJava2: onSubscribe, , : currentThread = main
I/RxJava2: ObservableEmitter, : currentThread = RxCachedThreadScheduler-1
I/RxJava2: onNext, : currentThread = main
I/RxJava2: onComplete, : currentThread = main
次のことがわかります.
2.1スレッド切替の原理を分析し、まずsubscribeOnメソッドを見て、ObservableSubscribeOnを返します.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// this observable,scheduler Schedulers.io
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
ObservableSubscribeOnを見続けます.AbsstractObservableWithUpstreamを継承しています.
public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
super(source);// 1. observable
this.scheduler = scheduler;// 2.
}
@Override
public void subscribeActual(final Observer super T> s) {
// 3. Observer
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
// 4. observer onSubscribe
s.onSubscribe(parent);
// 5.SubscribeTask runnable,scheduler runnable, Disposable Disposable
// ,run scheduler
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer super T> actual;
final AtomicReference s;
SubscribeOnObserver(Observer super T> actual) {
this.actual = actual;// observer
this.s = new AtomicReference();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);// SubscribeOnObserver, observer onNext
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
...
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;// observer
}
@Override
public void run() {
// 6. scheduler observer observable ,
// , observer SubscribeOnObserver,
// onSubscribe、onNext SubscribeOnObserver , observer
source.subscribe(parent);
}
}
}
ここでは大まかな流れをまとめることができます
SchedulerのscheduleDirectメソッド
public abstract class Scheduler {
...
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {// SubscribeTask
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// IoScheduler, worker EventLoopWorker
final Worker w = createWorker();
// hook , decoratedRun == run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// DisposeTask Runnable, Disposable
DisposeTask task = new DisposeTask(decoratedRun, w);
// ThreadWorker scheduleActual ,
w.schedule(task, delay, unit);
return task;
}
@NonNull
public abstract Worker createWorker();// ,
...
}
AbstractObservableWithUpstream
abstract class AbstractObservableWithUpstream extends Observable implements HasUpstreamObservableSource {
/** The source consumable Observable. */
protected final ObservableSource source;// Observable ObservableSource , Observable
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource source) {
this.source = source;// ,ObservableSubscribeOn
}
@Override
public final ObservableSource source() {
return source;
}
}
AbsstractObservableWithUpstreamはObservableを継承し、上流のObservableをsourceで保存します.
2.2 observeOnメソッドを見る
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
ここまで,フローは
subscribeOn
とそれほど差がなく,いずれも装飾者モードを採用し,Observable
とObserver
の装飾に対してそれぞれ中間の被観察者と観察者を生成し,スレッド変換を実現していると推測できる.ObservableObserveOn
を見続けます.これもObservable
の装飾で、AbstractObservableWithUpstream
も受け継がれています.public final class ObservableObserveOn extends AbstractObservableWithUpstream {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);// 1. observable
this.scheduler = scheduler;// 2.
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 3. HandlerScheduler, HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
// 4. observer, observable subscribe
// observable subscribeActual, ObserveOnObserver onSubscribe、onNext
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
}
ObserveOnObserverの静的内部クラスObserveOnObserver,上流observableはsubscribeActualメソッドを実行し,ObserveOnObserverのonSubscribe,onNextなどのメソッドを呼び出す.
static final class ObserveOnObserver extends BasicIntQueueDisposable
implements Observer, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;// observer
this.worker = worker;//
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {// true
@SuppressWarnings("unchecked")
QueueDisposable qd = (QueueDisposable) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {//
sourceMode = m;
queue = qd;
done = true;// true, onxx , return
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);// observer onSubscribe
return;
}
}
queue = new SpscLinkedArrayQueue(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {//
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();//
}
@Override
public void onError(Throwable t) {//
if (done) {//
RxJavaPlugins.onError(t);
return;
}
error = t;//
done = true;// true
schedule();//
}
@Override
public void onComplete() {
if (done) {//
return;
}
done = true;// true
schedule();//
}
...
}
最終的にscheduleメソッドが呼び出され、スレッドの切り替えが実現されます.
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
ここでは、ObserveOnObserverがRunnableインタフェースを実現したため、runメソッドを参照してください.
@Override
public void run() {
if (outputFused) {// false
drainFused();
} else {
drainNormal();
}
}
drainNormalの方法を見て
void drainNormal() {
int missed = 1;
final SimpleQueue q = queue;
final Observer super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();//
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);//
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {//
return;
}
if (empty) {//
break;
}
a.onNext(v);// observer
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//
boolean checkTerminated(boolean d, boolean empty, Observer super T> a) {
if (cancelled) {//
queue.clear();//
return true;
}
if (d) {// done
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
まとめ