Rxjava , AsyncTask, , 。 , 。
Rxjava , Rxjava, Rxjava 。
簡単な例を見てみます。
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
Log.e(getClass().getName(), "subscriber call begin");
subscriber.onNext("value is 1");
subscriber.onNext("value is 2");
subscriber.onNext("value is 3");
subscriber.onCompleted();
}
});
Observer observer = new Observer() {
@Override
public void onCompleted() {
Log.e(getClass().getName(), "observer is completed");
}
@Override
public void onError(Throwable e) {
Log.e(getClass().getName(), "observer is error");
}
@Override
public void onNext(String s) {
Log.e(getClass().getName(), "observer next value : " + s);
}
};
observable.subscribe(observer);
印刷結果:
subscriber call begin
observer next value : value is 1
observer next value : value is 2
observer next value : value is 3
observer is completed
私たちは観察者からソース解析を開始しました。
...
final OnSubscribe onSubscribe;
protected Observable(OnSubscribe f) {
this.onSubscribe = f;
}
@Deprecated
public static Observable create(OnSubscribe f) {
return new Observable(RxJavaHooks.onCreate(f));
}
public interface OnSubscribe<T> extends Action1<Subscriber super T>> {
}
public interface Action1<T> extends Action {
void call(T t);
}
...
create方法はnewだけです。Observableの対象が出てきます。コードはRxJavaHook類に言及しています。この種類のonCreate方法は何をしていますか?
// RxJavaHooks
...
static volatile Func1 onObservableCreate;
static void initCreate() {
onObservableCreate = new Func1() {
@Override
public Observable.OnSubscribe call(Observable.OnSubscribe f) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
}
};
}
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1 f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
...
Observable Createのcall方法はObservable.OnSubscribeの例を返して、引き続き見ていきます。
// RxJavaPlugins , ,
public RxJavaObservableExecutionHook getObservableExecutionHook() {
if (observableExecutionHook.get() == null) {
...
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
...
}
return observableExecutionHook.get();
}
RxJavaObservable Execution HookDefaultはRxJavaObservable Execution Hook類を直接継承しています。ただ単例で他のものを作っただけです。だから直接RxJavable Execution Hookという種類を見ます。
@Deprecated
public OnSubscribe onCreate(OnSubscribe f) {
return f;
}
自分に帰るということです。
public static Observable create(OnSubscribe f) {
return new Observable(RxJavaHooks.onCreate(f));
}
しばらく見てもいいです。
public static Observable create(OnSubscribe f) {
return new Observable(f);
}
これは簡単です。また観察者のコードを見てみましょう。
public interface Observer {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
実はインターフェースです。ObservableとObserverの購読関係のコードを見続けます。
public final Subscription subscribe(final Observer super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber(observer));
}
public final Subscription subscribe(Subscriber super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
SubscriberはObserverの抽象的な実現類です。
// Subscriber
public abstract class Subscriber<T> implements Observer<T>, Subscription {
...
//
public void onStart() {
// do nothing by default
}
...
}
Subscriberについては、まずこれだけ知っています。私たちが伝えたのはObserverです。subscribeの方法はSubscriberです。ですから、Observer Subscriberを使ってObserverを代理します。
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer super T> observer;
public ObserverSubscriber(Observer super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
}
最後にObservableを呼び出した静的方法subscribeは購読関係を開始した:
static Subscription subscribe(Subscriber super T> subscriber, Observable observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
}
}
例のObservableを見に来ます。
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
Log.e(getClass().getName(), "subscriber call begin")
// /
subscriber.onNext("value is 1")
subscriber.onNext("value is 2")
subscriber.onNext("value is 3")
subscriber.onCompleted()
}
})
Rxjavaの基本的な流れはここまで分析します。