RxJava 2ソースコード(二)

2446 ワード

Observableのxxxオペレータを読むには
  • ObservableのサブクラスObservableXXX RxJavaPluginsを見つける.onAssembly(new ObservableXXX());
  • ObservableXXXのsubscribeActual(Observer super T>s)関数を表示し、一般的に次の3つのことをします.
  • は一般的にDisposableインタフェースの実装クラスd
  • を作成する.
  • はs.onSubscribe(d)を呼び出す.
  • 具体的なsubscribe実装コード
  • 具体的なsubscribe実装コードに注目すべきいくつかの点
  • disposedの実装
  • observerのonNext,OnComplete,OnErrorはいつ呼び出されますか

  • Observable.justを例に
  • ObservableFromArray
  • が見つかりました
  • ObservableFromArrayのsubscribeActual関数を見ると、主な論理はFromArrayDisposableのrunメソッドの
    public final class ObservableFromArray extends Observable {
        final T[] array;
        public ObservableFromArray(T[] array) {
            this.array = array;
        }
        @Override
        public void subscribeActual(Observer super T> s) {
            FromArrayDisposable d = new FromArrayDisposable(s, array);
    
            s.onSubscribe(d);
    
            if (d.fusionMode) {
                //   fusion mode    ,   QueueFuseable,   false,      
                return;
            }
    
            d.run();
        }
    
  • にあることが分かった.
  • FromArrayDisposableのrun
    static final class FromArrayDisposable extends BasicQueueDisposable {
    
        final Observer super T> actual;
    
        final T[] array;
    
        int index;
    
        boolean fusionMode;
    
        volatile boolean disposed;
    
        FromArrayDisposable(Observer super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }
    
        @Override
        public void dispose() {
            //    boolean     
            disposed = true;
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    
        void run() {
            T[] a = array;
            int n = a.length;
    
            for (int i = 0; i < n && !isDisposed(); i++) {
                //     Disposed   array  
                T value = a[i];
                if (value == null) {
                    //   value null error,  for  
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                //  onNext
                actual.onNext(value);
            }
            if (!isDisposed()) {
                //     Disposed, onComplete
                actual.onComplete();
            }
        }
    }
    
  • を表示
  • ですので、Justオペレータは配列内のイベントを順次送信し、nullに遭遇すると中断します.手動dispose状態がずっと変わらない限り
  • PS
    私のgithub:https://github.com/nppp1990/MyTips