Rxjava-基本的な流れ


Rxjava       ,   AsyncTask,               ,        。    ,    。
       Rxjava  ,           Rxjava,       Rxjava        。
簡単な例を見てみます。
//     
Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber super String> subscriber) {
        Log.e(getClass().getName(), "subscriber call begin");
        subscriber.onNext("value is 1");
        subscriber.onNext("value is 2");
        subscriber.onNext("value is 3");
        subscriber.onCompleted();
    }
});
//    /   
Observer observer = new Observer() {
    @Override
    public void onCompleted() {
        Log.e(getClass().getName(), "observer is completed");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(getClass().getName(), "observer is error");
    }
    @Override
    public void onNext(String s) {
        Log.e(getClass().getName(), "observer next value : " + s);
    }
};
//       
observable.subscribe(observer);
印刷結果:
subscriber call begin
observer next value : value is 1
observer next value : value is 2
observer next value : value is 3
observer is completed
私たちは観察者からソース解析を開始しました。
// Observable 
...
//        ,OnSubscribe     
final OnSubscribe onSubscribe;
//     
protected Observable(OnSubscribe f) {
   this.onSubscribe = f;
}
//       ,     1.2.7       ,      ,            
@Deprecated
public static  Observable create(OnSubscribe f) {
    return new Observable(RxJavaHooks.onCreate(f));
}
//   Action1   
public interface OnSubscribe<T> extends Action1<Subscriber super T>> {
    // cover for generics insanity
}
// Action1  
public interface Action1<T> extends Action {
    void call(T t);
}
...
create方法はnewだけです。Observableの対象が出てきます。コードはRxJavaHook類に言及しています。この種類のonCreate方法は何をしていますか?
// RxJavaHooks 
...
static volatile Func1 onObservableCreate;
static void initCreate() {
    onObservableCreate = new Func1() {
        @Override
        public Observable.OnSubscribe call(Observable.OnSubscribe f) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
        }
    };
}
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1 f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}
...
Observable Createのcall方法はObservable.OnSubscribeの例を返して、引き続き見ていきます。
// RxJavaPlugins ,       ,    
public RxJavaObservableExecutionHook getObservableExecutionHook() {
    if (observableExecutionHook.get() == null) {
        ...
        observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
        ...
    }
    return observableExecutionHook.get();
}
RxJavaObservable Execution HookDefaultはRxJavaObservable Execution Hook類を直接継承しています。ただ単例で他のものを作っただけです。だから直接RxJavable Execution Hookという種類を見ます。
// RxJavaObservableExecutionHook
@Deprecated
public  OnSubscribe onCreate(OnSubscribe f) {
    return f;
}
自分に帰るということです。
// Observable 
public static  Observable create(OnSubscribe f) {
    return new Observable(RxJavaHooks.onCreate(f));
}
しばらく見てもいいです。
// Observable 
public static  Observable create(OnSubscribe f) {
    return new Observable(f);
}
これは簡単です。また観察者のコードを見てみましょう。
// Observer 
public interface Observer {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}
実はインターフェースです。ObservableとObserverの購読関係のコードを見続けます。
// Observable 
public final Subscription subscribe(final Observer super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber super T>)observer);
    }
    if (observer == null) {
        throw new NullPointerException("observer is null");
    }
    return subscribe(new ObserverSubscriber(observer));
}
public final Subscription subscribe(Subscriber super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
SubscriberはObserverの抽象的な実現類です。
//    Subscriber
public abstract class Subscriber<T> implements Observer<T>, Subscription {
    ...
    //         
    public void onStart() {
        // do nothing by default
    }
    ...
}
Subscriberについては、まずこれだけ知っています。私たちが伝えたのはObserverです。subscribeの方法はSubscriberです。ですから、Observer Subscriberを使ってObserverを代理します。
// ObserverSubscriber  Observer
public final class ObserverSubscriber<T> extends Subscriber<T> {
    final Observer super T> observer;
    public ObserverSubscriber(Observer super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }
    @Override
    public void onError(Throwable e) {
        observer.onError(e);
    }
    @Override
    public void onCompleted() {
        observer.onCompleted();
    }
}
最後にObservableを呼び出した静的方法subscribeは購読関係を開始した:
// Observable 
static  Subscription subscribe(Subscriber super T> subscriber, Observable observable) {
    if (subscriber == null) {
        throw new IllegalArgumentException("subscriber can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    //    start
    subscriber.onStart();
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber(subscriber);
    }
    try {
        //      onSubscribe.call(subscriber)
        // onSubscribe     Observable    ,subscriber     /   
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
    }
}
例のObservableを見に来ます。
Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber super String> subscriber) {
        Log.e(getClass().getName(), "subscriber call begin");
        //        /      
        subscriber.onNext("value is 1");
        subscriber.onNext("value is 2");
        subscriber.onNext("value is 3");
        subscriber.onCompleted();
    }
});
Rxjavaの基本的な流れはここまで分析します。