RxJava 2ソースコード(二)
2446 ワード
Observableのxxxオペレータを読むには ObservableのサブクラスObservableXXX RxJavaPluginsを見つける.onAssembly(new ObservableXXX()); ObservableXXXのsubscribeActual(Observer super T>s)関数を表示し、一般的に次の3つのことをします. は一般的にDisposableインタフェースの実装クラスd を作成する.はs.onSubscribe(d)を呼び出す. 具体的なsubscribe実装コード 具体的なsubscribe実装コードに注目すべきいくつかの点 disposedの実装 observerのonNext,OnComplete,OnErrorはいつ呼び出されますか
Observable.justを例に ObservableFromArray が見つかりました ObservableFromArrayのsubscribeActual関数を見ると、主な論理はFromArrayDisposableのrunメソッドの にあることが分かった. FromArrayDisposableのrun を表示ですので、Justオペレータは配列内のイベントを順次送信し、nullに遭遇すると中断します.手動dispose状態がずっと変わらない限り PS
私のgithub:https://github.com/nppp1990/MyTips
Observable.justを例に
public final class ObservableFromArray extends Observable {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer super T> s) {
FromArrayDisposable d = new FromArrayDisposable(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
// fusion mode , QueueFuseable, false,
return;
}
d.run();
}
static final class FromArrayDisposable extends BasicQueueDisposable {
final Observer super T> actual;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
@Override
public void dispose() {
// boolean
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
// Disposed array
T value = a[i];
if (value == null) {
// value null error, for
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
// onNext
actual.onNext(value);
}
if (!isDisposed()) {
// Disposed, onComplete
actual.onComplete();
}
}
}
私のgithub:https://github.com/nppp1990/MyTips