RxJavaソースの浅い分析--購読プロセス、mapとFilterオペレータの実現原理

11712 ワード

RxJavaソースの浅い分析--購読プロセス、mapとFilterオペレータの実現原理
RxJavaは非常にポピュラーな観察者モードに基づく応答型プログラミングフレームワークであり,Android開発などの分野で広く応用されている.Java開発者として、RxJavaの実現原理を理解し、RxJavaの設計思想を把握する必要があります.そうすれば、RxJavaを使って優れたコードを書くことができます.
購読プロセス
RxJavaを使用するには、まずオブザーバObservableオブジェクトを作成します.通常、Observableのcreateメソッドを使用して、次のようにObservableオブジェクトを作成します.
    Observable observable = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext(new Integer(1));
            e.onNext(new Integer(2));
            e.onComplete();
        }
    });    

createメソッドのソースコードを見てみましょう.
    public static  Observable create(ObservableOnSubscribe source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate(source));
    }

createメソッドでは、まず入力されたパラメータObservableOnSubscribeオブジェクトを空チェックし、次にnewはObservableCreateオブジェクトを作成し、このオブジェクトをパラメータとしてRxJavaPlugins.onAssemblyメソッドに入力します.まずonAssemblyメソッドのソースコードを見てみましょう.
    public static  Observable onAssembly(@NonNull Observable source) {
        Function super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

onAssemblyメソッドでは、onObservableAssemblyオブジェクトがnullであるかどうかを最初に判断し、nullでない場合はapplyメソッドを呼び出して結果を返し、nullである場合は直接入力したパラメータを返します.デフォルトではonObservableAssemblyはnull、つまりデフォルトではonAssemblyメソッドは実際には何もせず、直接入力したObservableオブジェクトを返します.
Observableのcreateメソッドに戻り、new ObservableCreateが何をしたのかを見てみましょう.ObservableCreateクラスの構築方法は次のとおりです.
    public final class ObservableCreate extends Observable {
        final ObservableOnSubscribe source;
    
        public ObservableCreate(ObservableOnSubscribe source) {
            this.source = source;
        }
        
        ...
    }    

ObservableCreateクラスの構築方法は簡単で、入力されたObservableOnSubscribeオブジェクトを直接自分のグローバル変数sourceに保存し、この変数はfinalによって修飾されます.ObservableCreateクラスを解析することにより,ObservableCreateはObservableから継承され,すなわちObservableCreate自体も被観察者オブジェクトであることが分かった.
上記の解析から、Observable.createメソッドは、ObservableCreateオブジェクトを作成し、ObservableOnSubscribeをこのObservableCreateに保存することであることがわかります.
Observableを作成すると、対応するオブザーバーを関連付けるためにsubscribeメソッドを呼び出すことができます.
    observable.subscribe(new Observer() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("RxJava", "onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.e("RxJava", "onSubscribe" + integer.toString());
        }
    
        @Override
        public void onError(Throwable e) {
            Log.e("RxJava", "onError");
        }
    
        @Override
        public void onComplete() {
            Log.e("RxJava", "onComplete");
        }
    });

subscribeメソッドのソースコードを見てみましょう.
    public final void subscribe(Observer super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, 
                observer);//    RxJavaPlugins.onAssembly  ,        observer  
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
         ...
        }
    }

subscribeメソッドでは、observerオブジェクトがnullであるかどうかを確認し、RxJavaPlugins.onSubscribeメソッドを呼び出します.このメソッドは、デフォルトでは何もせずにobserverオブジェクトを直接返し、subscribeActualメソッドを呼び出します.上記の分析では、createメソッドによって生成されるObservableは、実際にはObservableCreateオブジェクトであるため、ObservableCreateクラスのsubscribeActualメソッドを見てみましょう.
    protected void subscribeActual(Observer super T> observer) {
        CreateEmitter parent = new CreateEmitter(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActualメソッドでは、氏がEmitterイベント送信機になり、observerのonSubscribeメソッドが直接呼び出されるため、onSubscribeメソッドはイベント送信前に実行されることがわかります.そしてsource.subscribeメソッドを呼び出し、先に述べたように、このsourceオブジェクトはcreateメソッド呼び出し時に渡されたObservableOnSubscribeオブジェクトであり、このときObservableOnSubscribeのsubscribeメソッドが呼び出され、様々なイベントも成功裏に送信されます.ソースコードから、subscribeメソッドを呼び出すときに異常が発生すると、EmitterにonErrorイベントが直接送信されることがわかります.
これで最も簡単な購読プロセスが完了します.
イベントの送信
上記ではRxjavaのサブスクリプションフローを簡単に解析し,可視イベントは最終的にsubscribeActual法でsource.subscribe(parent)を呼び出して送信される.subscribeActual法でCreateEmitterオブジェクトを生成し,このEmmitterをonSubscribe法に導入した.本明細書の最初の例から分かるように、様々なイベントは実際にこのEmmitterによって送信され、まずCreateEmmitterオブジェクトの構造方法を見てみましょう.
    CreateEmitter(Observer super T> observer) {
        this.observer = observer;
    }

CreateEmmitterにObserverオブジェクトがあることがわかります.CreateEmmitterのonNext、onComplete、onErrorメソッドを使用して、さまざまなイベントを送信できます.ここでは、onNextのソースコードを例に、EmmitterがどのようにObserverにイベントを送信したのかを見てみましょう.
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

CreateEmmitterのonNextメソッドは非常に簡単で、observer.onNextメソッドを直接呼び出すことであり、onCompleteとonErrorの原理もonNextメソッドと大きく異なりますが、onCompleteとonErrorメソッドではdispose()メソッドも呼び出され、onCompleteとonErrorメソッドを呼び出すとobserverはイベントを受信できません.
map操作
map操作によりデータ型の変換を行うことができ、mapの基本的な使用方法は以下の通りである.
    Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext(new Integer(0));
            e.onComplete();
        }
    }).map(new Function() {
        @Override
        public String apply(Integer integer) throws Exception {
            return integer.toString();
        }
    }).subscribe(new Observer() {
        public void onSubscribe(Disposable d) { Log.e("RxJava", "onSubscribe");}
        public void onNext(String s) { Log.e("RxJava", "onNext" + s);}
        public void onError(Throwable e) {Log.e("RxJava", "onSubscribe");}
        public void onComplete() {Log.e("RxJava", "onSubscribe");}
    });

mapメソッドのソースコードは次のとおりです.
    public final  Observable map(Function super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
    }

mapメソッドはcreateメソッドに似ていることがわかりますが、ここではObservableMapオブジェクトを返し、元のObservableオブジェクトとパラメータmapperをObservableMapの構造メソッドに転送します.ObservableMapの構築方法は以下の通りです.
    public final class ObservableMap extends AbstractObservableWithUpstream {
        final Function super T, ? extends U> function;
    
        public ObservableMap(ObservableSource source, Function super T, ? extends U> function) {
            super(source);//       source           
            this.function = function;
        }
        
        ...
    }    

ObsevableMapの構築方法も、元のObservableオブジェクトとFunctionオブジェクトをそれぞれObservableMapオブジェクトのメンバー変数sourceとfunctionに保存するだけです.
このことから,map法は実際にObservableMapオブジェクトを生成し,元のObservableとパラメータFunctionをこのObservableMapオブジェクトに保存することである.このとき我々がsubscribeメソッドで購読する場合,実際に呼び出されたObservableMapオブジェクトのsubscribeメソッドである.以前,subscribeメソッドの内部でsubscribeActualメソッドが呼び出されたことを解析したが,ObservableMapのsubscribeActualメソッドは以下のようになった.
    public void subscribeActual(Observer super U> t) {
        source.subscribe(new MapObserver(t, function));
    }

subscribeActualメソッドで呼び出されたのは元のObservableのsubscribeメソッドであることがわかるが,subscribeメソッドのパラメータは我々が最初に生成したObserverオブジェクトではなく,Observerを変換し,MapObserverオブジェクトになった.各イベントが発行されると、受信イベントはこのMapObserverオブジェクトとなり、MapObserverオブジェクトはonNextメソッドを書き換え、MapObserverのソースコードは以下の通りである.
    static final class MapObserver extends BasicFuseableObserver {
        final Function super T, ? extends U> mapper;
    
        MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
    
        @Override
        public void onNext(T t) {
            ...
            U v;
    
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), 
                    "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }
    
        ...
    }

onNextメソッドでは、入力パラメータTは元のデータ型であり、Uは変換後のデータ型であり、その後mapper.apply(t)が呼び出されてデータ型の変換が実現される.このmapperは、mapメソッドを呼び出すときに入力されるFunctionオブジェクトである.最後に元ObserverのonNextメソッドを直接呼び出し,変換後のデータ型を転送し,最終的にmapプロセスを完了した.
ろ過操作
送信されたイベントをfilterメソッドでフィルタリングすることもできます.filterメソッドの基本的な使用は次のとおりです.
    Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext(new Integer(0));
            e.onComplete();
        }
    }).filter(new Predicate() {
        @Override
        public boolean test(Integer s) throws Exception {
            return integer.intValue() > 0 ? true : false;
        }
    }).subscribe(observer);

フィルタメソッドのソースコードを見てみましょう.
    public final Observable filter(Predicate super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter(this, predicate));
    }

filterメソッドのソースコードはcreateメソッド,mapメソッドとよく似ているが,今回の戻りデータはObservableFilterになっている.filterメソッドとcreate、mapメソッドのループがほぼ一致していることがわかります.ObservableFilterのsubscribeActualメソッドを直接見てみましょう.
    public void subscribeActual(Observer super T> s) {
        source.subscribe(new FilterObserver(s, predicate));
    }

ObservableMapクラスのsubscribeActualメソッドと同様に、ObservableFilterクラスのsubscribeActualメソッドも元のObserverのsubscribeメソッドを直接呼び出し、FilterObserverオブジェクトを転送します.前にmap操作を分析したときの方法を参照して、FilterObserverオブジェクトのonNextメソッドを直接見ます.
    public void onNext(T t) {
        if (sourceMode == NONE) {
            boolean b;
            try {
                b = filter.test(t);
            } catch (Throwable e) {
                fail(e);
                return;
            }
            if (b) {
                actual.onNext(t);
            }
        } else {
            actual.onNext(null);
        }
    }

FilterObserverのonNextメソッドは簡単で、filter.testメソッドを直接呼び出し、trueを返すとactual.onNextメソッドを呼び出します.そうしないと何もしません.これによりイベントのフィルタリングが実現される.
RxJavaの購読プロセスとmap、filterオペレータのソースコードを分析することによって、私たちはRxJavaの核心思想に対して大まかな理解を持つことができて、RxJavaの中の多くのその他の操作はmap、filterと似ていて、核心思想を掌握すれば理解しにくくありません.本文はただRxJavaの原理に対して1つの简単な说明を行って、本人の能力が限られているため、もし読者が文の中でどんな间违いあるいは不足なところがあることを発见するならば、指摘を助けることができることを望んで、みんなはいっしょに共に进歩します.