Androidフレームワーク——RxJava非同期開発フレームワーク
111391 ワード
紹介する
RxJavaはReactiveXのJava実装であり,このフレームワークによりデータの非同期取得を容易に行い,データをステップ変換することができる.
RxJava基本使用
導入依存
オブザーバの作成
まず、データが変更されたときに対応するコールバック関数を呼び出す観察者を作成します.
オブザーバの作成
次に、ObservableOnSubscribeタイプのパラメータが必要なオブジェクトを作成します.Observableオブジェクトのsubscribeメソッドが呼び出されると、ObservableOnSubscribe内のsubscribeコールバックメソッドが呼び出され、emitter操作により観察者へのイベントの送信が開始される.
Observableのsubscribeメソッドが呼び出されると、onNext,onNext,onCompleteメソッドが順次送信されます.
関連観察者と被観察者
ここでは変数を保存せずに、被観察者のサブスクリプション関数に直接新規作成し、観察者を追加します.
観察者においてEmitterによって送信されたイベントは、観察者によって順次受信され、コールバックメソッドが呼び出される.
RxJava共通オペレータ
RxJavaは作成オペレータ、変換オペレータ、フィルタオペレータ、組合せオペレータ、エラー処理オペレータ、アシストオペレータ、条件とブールオペレータ、算術集約オペレータ、接続オペレータなど、多くのオペレータを提供しています.以下では、よく使われるオペレータについて説明します.
オペレータの作成
1.intervalはintervalオペレータを使用してタイマとして機能し、一定時間ごとにイベントを送信することができます.
2.range rangeオペレータは、forループの代わりに秩序化された整数シーケンスを送信することができ、最初のパラメータは0未満ではなく、2番目のパラメータは終値であり、左閉右開である.
3.repeatは、指定されたデータを送信するプロセスをN回繰り返すことができる.
この呼び出しは0,1,2回送信され,送信順序は0,1,2,0,1,2である.
[変換](Transform)オペレータ
1.map mapオペレータは、イベントから送信されたデータを他のデータにマッピングすることができます.たとえば、データに追加の情報を追加したり、フィールドを取得したり、パッケージしたりします.匿名の内部クラスで宣言される2つのタイプは、処理前と処理後のデータ型です.
事件のデータは「人」から「いい人」に変わった.2.flatMapというオペレータは、送信配列イベントから個々のイベントに展開されるという意味で、具体的には分からない.
続きを待つ
[フィルタ](Filter)オペレータ
1.filter filterオペレータは、通過したイベントのみが後の観察者に受信されるイベントをフィルタできます.
この例では,1,2イベントともフィルタを通過せず,3,4,5を受信したメッセージのみを出力する.2.elementAt
指定した場所のデータを返します.ここでaccept関数は3のイベントしか受信しません.3.distinctというオペレータは、イベントデータが送信された場合、再送信されません.
重さを除いた結果は1,2,3,4,5であった.4.skip、take skipオペレータは前のn個のイベントをスキップすることができ、takeオペレータは前のn個のイベントのみを取得する.
この例はまず重み付けを行い,この場合は1,2,3,4,5,skipで前の2つをスキップして3,4,5になり,takeで前の2つを取り,最終結果は3,4を出力する.
オペレータが多すぎて無視
RxJavaのスレッド制御
スレッドを指定しない場合、Schedulerはsubscribeメソッドを呼び出すスレッドでデフォルトで実行されます.subscribeOnまたはObserveOnメソッドを呼び出すことで、Schedulerオブジェクトに転送され、操作実行のスレッドに影響を与えることができます.Schedulerのタイプは次のとおりです.
1. Schedulers.immediate():現在のスレッドで直接2.Schedulers.新Thread():常に新しいスレッド実行を有効にする.Schedulers.io():ファイルの読み書き,データベース,ネットワークインタラクションなどの操作に用いる,newThread()より効率が高い.Schedulers.computation():使用するSchedulerを計算し、スレッドプールは固定スレッドプールであり、大きさはcpuコア数5である.Schedulers.trampoline():現在のスレッドがエンキューされ、すぐに実行されず、キュー内のタスク6を順番に実行します.AndroidSchedulers.mainThread():RxAndroidライブラリで提供され、メインスレッドで実行されます.
RxJavaソース解析
RxJavaのソースコードを直接分析し、サブスクリプション、変換、スレッド切り替えの3つの部分に分けます.
RxJavaの購読プロセス
Observableの作成から見て、まずcreateメソッドで何をしたかを見てみましょう.
ここではsourceオブジェクトからObservableCreateを作成し、その内容を見てみましょう.
コンストラクション関数にsourceオブジェクトが格納されているだけであることがわかります.次に、実際にObservableオブジェクトを返すonAssemblyメソッドを見てみましょう.
createメソッドが作成されると、Observableオブジェクトが返されます.実際のサブタイプはObservableCreateです.リスニングを開始すると、このObservableオブジェクトのsubscribeメソッドが呼び出され、実際の操作を確認します.
ここでsubscribeActualメソッドを呼び出し、ObservableCreateクラスで見てみましょう.
購読プロセスはここまでです.次に、イベントの送信プロセスを見てみましょう.
RxJavaのイベント送信プロセス
EmitterがonNextメソッドを呼び出してイベントを送信するか、上の例を使用するか、emitterの実際のオブジェクトタイプはObservableCreateのsubscribeActualで作成されるCreateEmitterです.onNextプロセスを見てみましょう.
ここではobserverのonNextメソッドを直接呼び出し,subscribeメソッドを呼び出したときに渡される内部クラスである.ここでは簡単に見えるが,emitterのonNextメソッドの呼び出しは観察者の呼び出しを直接呼び出し,次にデータの変換過程を見る.
RxJavaの変換プロセス
最も一般的なデータマッピングプロセスはmapオペレータであり、その内容を見てみましょう.
createメソッドの構造とはあまり違わないように見えますが、実は論理も同じで、ここで返すとObservableMapタイプのオブジェクトが返されます.mapを呼び出してからsubscribeメソッドを呼び出すと、ObservableMapのsubscribeActualメソッドが呼び出されます.その内容を見てみましょう.
中は1行で、sourceはObservableMapの作成時に渡された前のObservableオブジェクト、つまりObservableCreateオブジェクトで、ここでそのsubscribeメソッドを呼び出して、パッケージのMapObserverオブジェクトに転送します.
次のプロセスはemitterまで前と同じです.onNextメソッドが呼び出されます.
MapObserverクラスのonNextメソッドが何をしているかを見てみましょう.
ここでactualオブジェクトはパッケージングされたObserverであり,mapperはカスタム変換方法である.onNextメソッドでは、カスタム変換メソッドを呼び出してデータ型変換を行い、パッケージされたobserverのonNextを呼び出します.
mapはObserverのパッケージによってイベントデータの変換プロセスを処理し,パッケージされたobserverを呼び出すと,過去に変換されたデータが伝達されることが分かる.
RxJavaのスレッド切り替えプロセス
もう1つ重要なのは、スレッド切り替えのプロセスです.まず、サンプルコードを使用します.
通常のイベント購読プロセスにsubscribeOn(Schedulers.newThread()が追加され、何をしたかを見てみましょう.
まず、作成されたSubscribeTaskオブジェクトを見てみましょう.
このTaskはRunnableを継承し、スレッドに渡して実行するか、実行するか、sourceを呼び出すかを見ることができる.subscribeメソッド.
schedulerを分析しています.scheduleDirectメソッドの前に、外部でSchedulersを通過する方法を見てみましょう.新Thread()で作成されたschedulerオブジェクトは何ですか.
大量のクラスをぐるぐる回ると、実際に作成されたSchedulerはNewThreadSchedulerタイプのオブジェクトであることがわかります.次にscheduleDirect操作を見てみましょう.
SubscribeOnの大まかな流れは:1.subscribeOnメソッドを呼び出すと、Scheduleのサブクラスオブジェクトが入力されます.2.サブスクリプション中にsubscribeActualが呼び出されると、次のsubscribeを呼び出す操作が1つのRunnableに格納されます.3.次に、ScheduleによってWorkerを作成します.このWorkerオブジェクトにはスレッドプールが維持され、転送されたRunnableがスレッドプールを介して呼び出されます.4.Observableのsubscribeメソッドはターゲットスレッドで呼び出され、上流送信イベントの操作はターゲットスレッドで実行され、上流実行スレッドを切り替える目的を果たす.
次にObserveOnが何をしたか見てみましょう.
subscribeの場合、ObservableObserveOnのsubscribeActualメソッドでは、subscribeが通常どおり呼び出されているのが見えますが、非TrampolineSchedulerが設定されている場合に観察者がObserveOnObserverをアウトソーシングしているだけです.サブスクリプションが完了すると、emitterのonNext呼び出しを待っています.ObserveOnObserverのonNextメソッドが呼び出されたときにどのような操作をしたかを直接見てみましょう.
ObserveOn実装プロセスは、1.購読時に設定したSchedulerがスレッドを切り替える必要があるかどうかを判断し、もしそうであればObserverをパッケージします.2.イベント転送時にscheduleメソッドを呼び出し、自身をWorkerに入れて実行します.3.自身のrunメソッドはターゲットスレッドで実行され、drainNormalメソッドを呼び出し、一連の判断を行った後に下流のonNextメソッドを呼び出し、下流スレッド切替の効果を奏する.
の最後の部分
RxJavaについてはここまで分析しましたが、ざっと見てみると、中にはまだ細部が分析されていないものがたくさんあります.興味のある学生は自分で深く研究することができます.
RxJavaはReactiveXのJava実装であり,このフレームワークによりデータの非同期取得を容易に行い,データをステップ変換することができる.
RxJava基本使用
導入依存
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.3'
オブザーバの作成
まず、データが変更されたときに対応するコールバック関数を呼び出す観察者を作成します.
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
// Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
オブザーバの作成
次に、ObservableOnSubscribeタイプのパラメータが必要なオブジェクトを作成します.Observableオブジェクトのsubscribeメソッドが呼び出されると、ObservableOnSubscribe内のsubscribeコールバックメソッドが呼び出され、emitter操作により観察者へのイベントの送信が開始される.
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext(" ");
emitter.onNext(" ");
emitter.onComplete();
}
});
// ,
Observable<String> observable2 = Observable.just(" "," ");
Observableのsubscribeメソッドが呼び出されると、onNext,onNext,onCompleteメソッドが順次送信されます.
関連観察者と被観察者
ここでは変数を保存せずに、被観察者のサブスクリプション関数に直接新規作成し、観察者を追加します.
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext(" ");
emitter.onNext(" ");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(" :"+s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(" ");
}
});
観察者においてEmitterによって送信されたイベントは、観察者によって順次受信され、コールバックメソッドが呼び出される.
RxJava共通オペレータ
RxJavaは作成オペレータ、変換オペレータ、フィルタオペレータ、組合せオペレータ、エラー処理オペレータ、アシストオペレータ、条件とブールオペレータ、算術集約オペレータ、接続オペレータなど、多くのオペレータを提供しています.以下では、よく使われるオペレータについて説明します.
オペレータの作成
1.intervalはintervalオペレータを使用してタイマとして機能し、一定時間ごとにイベントを送信することができます.
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong+"");
}
});
2.range rangeオペレータは、forループの代わりに秩序化された整数シーケンスを送信することができ、最初のパラメータは0未満ではなく、2番目のパラメータは終値であり、左閉右開である.
Observable.range(0,5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
3.repeatは、指定されたデータを送信するプロセスをN回繰り返すことができる.
Observable.range(0,3).repeat(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
この呼び出しは0,1,2回送信され,送信順序は0,1,2,0,1,2である.
[変換](Transform)オペレータ
1.map mapオペレータは、イベントから送信されたデータを他のデータにマッピングすることができます.たとえば、データに追加の情報を追加したり、フィールドを取得したり、パッケージしたりします.匿名の内部クラスで宣言される2つのタイプは、処理前と処理後のデータ型です.
Observable.just(" ").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return " "+s;
}
});
事件のデータは「人」から「いい人」に変わった.2.flatMapというオペレータは、送信配列イベントから個々のイベントに展開されるという意味で、具体的には分からない.
Observable.fromArray(1,2,3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer+" ");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
続きを待つ
[フィルタ](Filter)オペレータ
1.filter filterオペレータは、通過したイベントのみが後の観察者に受信されるイベントをフィルタできます.
Observable.just(1,2,3,4,5).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(" "+integer);
}
});
この例では,1,2イベントともフィルタを通過せず,3,4,5を受信したメッセージのみを出力する.2.elementAt
Observable.just(1,2,3,4).elementAt(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
指定した場所のデータを返します.ここでaccept関数は3のイベントしか受信しません.3.distinctというオペレータは、イベントデータが送信された場合、再送信されません.
Observable.just(1,2,2,3,4,3,5).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer+"");
}
});
重さを除いた結果は1,2,3,4,5であった.4.skip、take skipオペレータは前のn個のイベントをスキップすることができ、takeオペレータは前のn個のイベントのみを取得する.
Observable.just(1,2,2,3,4,3,5).distinct().skip(2).take(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer+"");
}
});
この例はまず重み付けを行い,この場合は1,2,3,4,5,skipで前の2つをスキップして3,4,5になり,takeで前の2つを取り,最終結果は3,4を出力する.
オペレータが多すぎて無視
RxJavaのスレッド制御
スレッドを指定しない場合、Schedulerはsubscribeメソッドを呼び出すスレッドでデフォルトで実行されます.subscribeOnまたはObserveOnメソッドを呼び出すことで、Schedulerオブジェクトに転送され、操作実行のスレッドに影響を与えることができます.Schedulerのタイプは次のとおりです.
1. Schedulers.immediate():現在のスレッドで直接2.Schedulers.新Thread():常に新しいスレッド実行を有効にする.Schedulers.io():ファイルの読み書き,データベース,ネットワークインタラクションなどの操作に用いる,newThread()より効率が高い.Schedulers.computation():使用するSchedulerを計算し、スレッドプールは固定スレッドプールであり、大きさはcpuコア数5である.Schedulers.trampoline():現在のスレッドがエンキューされ、すぐに実行されず、キュー内のタスク6を順番に実行します.AndroidSchedulers.mainThread():RxAndroidライブラリで提供され、メインスレッドで実行されます.
RxJavaソース解析
RxJavaのソースコードを直接分析し、サブスクリプション、変換、スレッド切り替えの3つの部分に分けます.
RxJavaの購読プロセス
Observableの作成から見て、まずcreateメソッドで何をしたかを見てみましょう.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ここではsourceオブジェクトからObservableCreateを作成し、その内容を見てみましょう.
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
// source
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
コンストラクション関数にsourceオブジェクトが格納されているだけであることがわかります.次に、実際にObservableオブジェクトを返すonAssemblyメソッドを見てみましょう.
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
//
return source;
}
createメソッドが作成されると、Observableオブジェクトが返されます.実際のサブタイプはObservableCreateです.リスニングを開始すると、このObservableオブジェクトのsubscribeメソッドが呼び出され、実際の操作を確認します.
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//this Observable ,observer ,
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Observable
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
//
return observer;
}
ここでsubscribeActualメソッドを呼び出し、ObservableCreateクラスで見てみましょう.
@Override
protected void subscribeActual(Observer<? super T> observer) {
// CreateEmitter , ObservableEmitter Disposable
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// ,
observer.onSubscribe(parent);
try {
// create , emitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
購読プロセスはここまでです.次に、イベントの送信プロセスを見てみましょう.
RxJavaのイベント送信プロセス
EmitterがonNextメソッドを呼び出してイベントを送信するか、上の例を使用するか、emitterの実際のオブジェクトタイプはObservableCreateのsubscribeActualで作成されるCreateEmitterです.onNextプロセスを見てみましょう.
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// onNext
observer.onNext(t);
}
}
ここではobserverのonNextメソッドを直接呼び出し,subscribeメソッドを呼び出したときに渡される内部クラスである.ここでは簡単に見えるが,emitterのonNextメソッドの呼び出しは観察者の呼び出しを直接呼び出し,次にデータの変換過程を見る.
RxJavaの変換プロセス
最も一般的なデータマッピングプロセスはmapオペレータであり、その内容を見てみましょう.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
createメソッドの構造とはあまり違わないように見えますが、実は論理も同じで、ここで返すとObservableMapタイプのオブジェクトが返されます.mapを呼び出してからsubscribeメソッドを呼び出すと、ObservableMapのsubscribeActualメソッドが呼び出されます.その内容を見てみましょう.
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
中は1行で、sourceはObservableMapの作成時に渡された前のObservableオブジェクト、つまりObservableCreateオブジェクトで、ここでそのsubscribeメソッドを呼び出して、パッケージのMapObserverオブジェクトに転送します.
次のプロセスはemitterまで前と同じです.onNextメソッドが呼び出されます.
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// observer subscribe , observer MapObserver , onNext
observer.onNext(t);
}
}
MapObserverクラスのonNextメソッドが何をしているかを見てみましょう.
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//map
U v;
try {
//
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
ここでactualオブジェクトはパッケージングされたObserverであり,mapperはカスタム変換方法である.onNextメソッドでは、カスタム変換メソッドを呼び出してデータ型変換を行い、パッケージされたobserverのonNextを呼び出します.
mapはObserverのパッケージによってイベントデータの変換プロセスを処理し,パッケージされたobserverを呼び出すと,過去に変換されたデータが伝達されることが分かる.
RxJavaのスレッド切り替えプロセス
もう1つ重要なのは、スレッド切り替えのプロセスです.まず、サンプルコードを使用します.
Observable.just(1).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("prozac",Thread.currentThread().getName());
}
});
通常のイベント購読プロセスにsubscribeOn(Schedulers.newThread()が追加され、何をしたかを見てみましょう.
// , subscribe ObservableSubscribeOn subscribeActual
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
// subscribe,
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
まず、作成されたSubscribeTaskオブジェクトを見てみましょう.
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//SubscribeTask ,
source.subscribe(parent);
}
}
このTaskはRunnableを継承し、スレッドに渡して実行するか、実行するか、sourceを呼び出すかを見ることができる.subscribeメソッド.
schedulerを分析しています.scheduleDirectメソッドの前に、外部でSchedulersを通過する方法を見てみましょう.新Thread()で作成されたschedulerオブジェクトは何ですか.
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
@NonNull
static final Scheduler NEW_THREAD;
static {
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
大量のクラスをぐるぐる回ると、実際に作成されたSchedulerはNewThreadSchedulerタイプのオブジェクトであることがわかります.次にscheduleDirect操作を見てみましょう.
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// NewThreadScheduler ,
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// Worker schedule ,
w.schedule(task, delay, unit);
return task;
}
//NewThreadScheduler
Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
//NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
// Task
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// Runnable ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// ScheduledRunnable
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
SubscribeOnの大まかな流れは:1.subscribeOnメソッドを呼び出すと、Scheduleのサブクラスオブジェクトが入力されます.2.サブスクリプション中にsubscribeActualが呼び出されると、次のsubscribeを呼び出す操作が1つのRunnableに格納されます.3.次に、ScheduleによってWorkerを作成します.このWorkerオブジェクトにはスレッドプールが維持され、転送されたRunnableがスレッドプールを介して呼び出されます.4.Observableのsubscribeメソッドはターゲットスレッドで呼び出され、上流送信イベントの操作はターゲットスレッドで実行され、上流実行スレッドを切り替える目的を果たす.
次にObserveOnが何をしたか見てみましょう.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//
@Override
protected void subscribeActual(Observer<? super T> observer) {
//TrampolineScheduler , subscribe
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// Worker
Scheduler.Worker w = scheduler.createWorker();
// ObserveOnObserver subscribe
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
subscribeの場合、ObservableObserveOnのsubscribeActualメソッドでは、subscribeが通常どおり呼び出されているのが見えますが、非TrampolineSchedulerが設定されている場合に観察者がObserveOnObserverをアウトソーシングしているだけです.サブスクリプションが完了すると、emitterのonNext呼び出しを待っています.ObserveOnObserverのonNextメソッドが呼び出されたときにどのような操作をしたかを直接見てみましょう.
@Override
public void onNext(T t) {
if (done) {
return;
}
// ,
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// worker this, Runnable , run
worker.schedule(this);
}
}
@Override
public void run() {
//
if (outputFused) {
drainFused();
} else {
//
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//a , onNext。
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
ObserveOn実装プロセスは、1.購読時に設定したSchedulerがスレッドを切り替える必要があるかどうかを判断し、もしそうであればObserverをパッケージします.2.イベント転送時にscheduleメソッドを呼び出し、自身をWorkerに入れて実行します.3.自身のrunメソッドはターゲットスレッドで実行され、drainNormalメソッドを呼び出し、一連の判断を行った後に下流のonNextメソッドを呼び出し、下流スレッド切替の効果を奏する.
の最後の部分
RxJavaについてはここまで分析しましたが、ざっと見てみると、中にはまだ細部が分析されていないものがたくさんあります.興味のある学生は自分で深く研究することができます.