浅析Rxjava(一)

3862 ワード

使用するライブラリは
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
の異なるバージョンで、ソースコードが異なる可能性があります.
1.解析する例を見てみる
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1111");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.v("next", s);
            }
        });

簡単な例は、印刷文字列1111である.createの方法を見てみましょう.
    public final static <T> Observable<T> create(Observable.OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

    protected Observable(Observable.OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

これonCreate(f)メソッドは、中から直接fを返して、ソースコードを見ることができます.ここではくっつきません.だからcreateは、OnSubscribeオブジェクトインスタンスをObservableに格納するonSubscribeです.
次にsubscribe()法を見てみましょう.
 public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        return subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(T t) {
                observer.onNext(t);
            }

        });
    }
observerオブジェクトをSubscriberオブジェクトに変換することを発見し、subscribe()メソッドを見てみましょう.
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        subscriber.onStart();

        try {
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (OnErrorNotImplementedException e2) {
                throw e2;
            } catch (Throwable e2) {
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                hook.onSubscribeError(r);
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }
の間にいくつかのコードが削除されました.コアコードを見てみましょう.開始するとsubscriberが実行されます.onStart()メソッドは,subscribeメソッドにおいて新しいnew Subscriberである.
次の論理は少し迂回しているかもしれません.
そしてonSubscribeStart(observable, observable.onSubscribe).call(subscriber)という方法のhook.onSubscribeStart(observable,observable.onSubscribe)この方法では、observableを直接onSubscribeは戻ってきます.つまり、この方法はobservableです.onSubscribe.call(subscriber)、このonSubscribeはcreate()の時に伝わってきたので、この方法は上記の例のsubscriberに行きます.onNext("1111")では、このsubscriberの例は、subscribe()メソッドでnew subscribe()が出たばかりで、observerに実行する.onNext(t)ここでobserverの例は例中のsubscribe(new Observer()であり、このnew observer()であり、その後Log.v(「next」,s)ここで、プロセス全体が終わり、logログが出てきました.
少し回るかもしれませんが、debugモードでキーでポイントを折り、1つずつ観察することができます.