RxJava 1.xソース分析の基本要素

17079 ワード

使用するバージョン:implementation'io.reactivex:rxjava:1.3.8'implementation'io.reactivex:rxandroid:1.2.1'
単純な例
まず簡単な例を見てみましょう.
Subscription subscription =  Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber super String> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("123");
                    subscriber.onNext("345");
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Observer() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }
        });

この簡単な例からRxJava 1の基本要素は
  • Observable
  • Observer
  • Subscription
  • OnSubscribe
  • Subscriber

  • Observable
  • 被観察者
  • Observabにより観察可能なシーケンスを作成する
  • subscribeを通じて観察者コードを登録し、長すぎると
  • を貼らない.
    Observerインタフェース
  • 観察者
  • Observableのsubscribeメソッドパラメータとして
  • public interface Observer {
    
        /**
         * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
         * 

    * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onCompleted(); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. *

    * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onCompleted}. * * @param e * the exception encountered by the Observable */ void onError(Throwable e); /** * Provides the Observer with a new item to observe. *

    * The {@link Observable} may call this method 0 or more times. *

    * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(T t); }


    Subscriptionインタフェース
  • 購読、被観察者と観察者との関係を記述するための
  • .
  • は、現在のサブスクリプションステータス
  • を購読解除および取得するために使用される.
    public interface Subscription {
    
        /**
         * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
         * was received.
         * 

    * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before * onCompleted is called). */ void unsubscribe(); /** * Indicates whether this {@code Subscription} is currently unsubscribed. * * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise */ boolean isUnsubscribed(); }


    OnSubscribeインタフェース
  • このインタフェース呼び出しは、購読時に出発する
  • である.
  • はObservableの内部にあり、実際の役割はサブスクライバにデータ
  • を送信することである.
     public interface OnSubscribe extends Action1> {
            // cover for generics insanity
        }
    
    public interface Action1 extends Action {
        void call(T t);
    }
    

    Subscriberクラス
  • ObserverとSubscription
  • を実現
    メソッド解析
    コードの例では、主に2つの方法が見られます.
  • create()メソッド
  • subscribe()メソッド
  • create()メソッド
    まずcreateメソッドを分析し、ソースコードを参照すると、ここでObservableオブジェクトを作成していることがわかります.
    /**
         * Constructs an Observable in an unsafe manner, that is, unsubscription and backpressure handling
         * is the responsibility of the OnSubscribe implementation.
         * @param  the value type emitted
         * @param f the callback to execute for each individual Subscriber that subscribes to the
         *          returned Observable
         * @return the new Observable instance
         * @deprecated 1.2.7 - inherently unsafe, use the other create() methods for basic cases or
         * see {@link #unsafeCreate(OnSubscribe)} for advanced cases (such as custom operators)
         * @see #create(SyncOnSubscribe)
         * @see #create(AsyncOnSubscribe)
         * @see #create(Action1, rx.Emitter.BackpressureMode)
         */
        @Deprecated
        public static  Observable create(OnSubscribe f) {
           //    Observable  ,  onSubscribe       f
            return new Observable(RxJavaHooks.onCreate(f));
        }
    
      /**
         * Creates an Observable with a Function to execute when it is subscribed to.
         * 

    * Note: Use {@link #unsafeCreate(OnSubscribe)} to create an Observable, instead of this constructor, * unless you specifically have a need for inheritance. * * @param f * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */ protected Observable(OnSubscribe f) { this.onSubscribe = f; } /** * Hook to call when an Observable is created. * @param the value type * @param onSubscribe the original OnSubscribe logic * @return the original or replacement OnSubscribe instance */ @SuppressWarnings({ "rawtypes", "unchecked" }) public static Observable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) { Func1 f = onObservableCreate; //onObservableCreate RxJavaHooks , , 。 onSubscribe if (f != null) { return f.call(onSubscribe); } return onSubscribe; } static void initCreate() { onObservableCreate = new Func1() { @Override public Observable.OnSubscribe call(Observable.OnSubscribe f) { // , f return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f); } }; ... } /** * Invoked during the construction by {@link Observable#unsafeCreate(OnSubscribe)} *

    * This can be used to decorate or replace the onSubscribe function or just perform extra * logging, metrics and other such things and pass through the function. * * @param the value type * @param f * original {@link OnSubscribe} to be executed * @return {@link OnSubscribe} function that can be modified, decorated, replaced or just * returned as a pass through */ @Deprecated public OnSubscribe onCreate(OnSubscribe f) { return f; }


    では、上記のコードに基づいてcreate()メソッドは、Observableオブジェクトを作成し、OnSubscribeオブジェクトを渡すことであることがよくわかります.
    subscribe()メソッド
    ここではオブザーバモードでのサブスクリプションと同様にObserverサブスクリプションObservableの変化をObserverに通知し,OnSubscribeのcallメソッドでObserverの変化を通知する.具体的なプロセスはコードを見る:
    /**
         * Subscribes to an Observable and provides an Observer that implements functions to handle the items the
         * Observable emits and any error or completion notification it issues.
         * 
    *
    Backpressure:
    *
    The operator consumes the source {@code Observable} in an unbounded manner (i.e., no * backpressure is applied to it).
    *
    Scheduler:
    *
    {@code subscribe} does not operate by default on a particular {@link Scheduler}.
    *
    * * @param observer * the Observer that will handle emissions and notifications from the Observable * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before * the Observable has completed * @see ReactiveX operators documentation: Subscribe */ public final Subscription subscribe(final Observer super T> observer) { if (observer instanceof Subscriber) { // , Observer Subscriber return subscribe((Subscriber super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } // Subscriber , subscribe , ObserverSubscriber 。 return subscribe(new ObserverSubscriber(observer)); } /** * Subscribes to an Observable and provides a Subscriber that implements functions to handle the items the * Observable emits and any error or completion notification it issues. *

    * A typical implementation of {@code subscribe} does the following: *

      *
    1. It stores a reference to the Subscriber in a collection object, such as a {@code List} object.
    2. *
    3. It returns a reference to the {@link Subscription} interface. This enables Subscribers to * unsubscribe, that is, to stop receiving items and notifications before the Observable completes, which * also invokes the Subscriber's {@link Subscriber#onCompleted onCompleted} method.
    4. *

    * An {@code Observable} instance is responsible for accepting all subscriptions and notifying all * Subscribers. Unless the documentation for a particular {@code Observable} implementation indicates * otherwise, Subscriber should make no assumptions about the order in which multiple Subscribers will * receive their notifications. *

    * For more information see the * ReactiveX documentation. *

    *
    Backpressure:
    *
    The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure * behavior.
    *
    Scheduler:
    *
    {@code subscribe} does not operate by default on a particular {@link Scheduler}.
    *
    * * @param subscriber * the {@link Subscriber} that will handle emissions and notifications from the Observable * @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can * unsubscribe from the Observable * @throws IllegalStateException * if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function * @throws IllegalArgumentException * if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null} * @throws OnErrorNotImplementedException * if the {@link Subscriber}'s {@code onError} method is null * @throws RuntimeException * if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable} * @see ReactiveX operators documentation: Subscribe */ public final Subscription subscribe(Subscriber super T> subscriber) { return Observable.subscribe(subscriber, this); } // , , Observable , subscribe 。 static Subscription subscribe(Subscriber super T> subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate //!!!!!!! , observable.onSubscribe.call(subscriber) RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } } /** * Hook to call before the child subscriber is subscribed to the OnSubscribe action. * @param the value type * @param instance the parent Observable instance * @param onSubscribe the original OnSubscribe action * @return the original or alternative action that will be subscribed to */ @SuppressWarnings({ "rawtypes", "unchecked" }) public static Observable.OnSubscribe onObservableStart(Observable instance, Observable.OnSubscribe onSubscribe) { Func2 f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; } static void init() { onError = new Action1() { @Override public void call(Throwable e) { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } }; onObservableStart = new Func2() { @Override public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2); } }; } @Deprecated public OnSubscribe onSubscribeStart(Observable extends T> observableInstance, final OnSubscribe onSubscribe) { // pass through by default // , onSubScribe。 create() 。 return onSubscribe; }


    ObserverSubscriberクラスのソースコードを見てみましょう.Observerをパッケージして実際に呼び出すか、それともObserverを使用するかです.彼のonNext,onError,onCompletedメソッドの両方が呼び出されたObserverのメソッドであることを確認できます.
    /**
     * Wraps an Observer and forwards the onXXX method calls to it.
     * @param  the value type
     */
    public final class ObserverSubscriber extends Subscriber {
        final Observer super T> observer;
    
        public ObserverSubscriber(Observer super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
    
        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }
    
        @Override
        public void onCompleted() {
            observer.onCompleted();
        }
    }