RxJava 2レコード

19384 ワード

1.まず基本的な呼び出しをする
//         
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(ObservableEmitter e) throws Exception {
        Log.i(TAG, "ObservableEmitter,       :" + " currentThread = " + Thread.currentThread().getName());
        e.onNext(1);//   Observer onNext
        e.onComplete();//   Observer onComplete
    }
});
//        
Observer observer = new Observer() {
    @Override
    public void onSubscribe(Disposable d) {
        // onSubscribe   subscribe      
        // Disposable       , Observer        
        Log.i(TAG, "onSubscribe,       ,           : " + " currentThread = " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext,       : " + " currentThread = " + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError,       : " + " currentThread = " + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete,      : " + " currentThread = " + Thread.currentThread().getName());
    }
};
//   
observable.subscribe(observer);

ログは次のように印刷されます.
I/RxJava2: onSubscribe,       ,           :  currentThread = main
I/RxJava2: ObservableEmitter,       : currentThread = main
I/RxJava2: onNext,       :  currentThread = main
I/RxJava2: onComplete,      :  currentThread = main

次のことがわかります.
  • 下流観察者onSubscribe法が最初に実行される.
  • 、次いで上流の被観察者のsubscribe法が実行される.
  • 最後に下流観察者のonNext、onCompleteが実行される.

  • この過程を分析してcreateはObservableオブジェクト、つまりObservableCreateを作成しました
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static  Observable create(ObservableOnSubscribe source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate(source));//     ,    ObservableCreate
    }
    

    subscribeを呼び出すとObservableCreateのsubscribeメソッドが呼び出され、ObservableCreateはObservableを継承し、親のsubscribeメソッドを呼び出すことに相当します.Observableのsubscribeメソッドを見てみましょう
    public abstract class Observable implements ObservableSource {   
        ...
        
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);//      subscribeActual      
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
        protected abstract void subscribeActual(Observer super T> observer);//     ,    
        ...
    }
    

    こうしてObservableCreateという実装クラスに戻り,そのsubscribeActual実装方法に重点を置いた.
    public final class ObservableCreate extends Observable {
        final ObservableOnSubscribe source;
    
        public ObservableCreate(ObservableOnSubscribe source) {
            this.source = source;//          create        ObservableOnSubscribe
        }
    
        @Override
        protected void subscribeActual(Observer super T> observer) {//      
            //    CreateEmitter,         ,    ObservableEmitter Disposable  ,            
            CreateEmitter parent = new CreateEmitter(observer);
            observer.onSubscribe(parent);//             onSubscribe  ,       Observer onSubscribe    
    
            try {
                source.subscribe(parent);//      ObservableOnSubscribe subscribe  ,      
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
        static final class CreateEmitter extends AtomicReference
        implements ObservableEmitter, Disposable {
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer super T> observer;
    
            CreateEmitter(Observer super T> observer) {//             
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {//      e.onNext ,     
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {//       
                    observer.onNext(t);//    observer onNext  
                }
            }
    
            @Override
            public void onError(Throwable t) {//          
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);//   observer onError  
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {//      e.onComplete ,     
                if (!isDisposed()) {
                    try {
                        observer.onComplete();//   observer onComplete  
                    } finally {
                        dispose();
                    }
                }
            }
            ...
        }
        ...
    }
    

    上に、最も基本的な上流から下流までの中間の呼び出しプロセスを示した.
    2.スレッド切替先上コードの追加
    observable.subscribeOn(Schedulers.io())//     subscribe      ,         (    ,     )
                    .observeOn(AndroidSchedulers.mainThread())//        Observer       ,            。(      )
                    .subscribe(observer);
    

    ログは次のように印刷されます.
    I/RxJava2: onSubscribe,       ,           :  currentThread = main
    I/RxJava2: ObservableEmitter,       : currentThread =  RxCachedThreadScheduler-1
    I/RxJava2: onNext,       :  currentThread = main
    I/RxJava2: onComplete,      :  currentThread = main
    

    次のことがわかります.
  • 下流観察者onSubscribe法は、subscribe時のスレッドを実行する.
  • 上流の観察者のsubscribe法(すなわち、データを送信する際の方法)は、RxCachedThreadScheduler−1スレッドで実行される.
  • 下流観察者のonNext,onCompleteはいずれもメインスレッドで実行される.

  • 2.1スレッド切替の原理を分析し、まずsubscribeOnメソッドを見て、ObservableSubscribeOnを返します.
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // this  observable,scheduler     Schedulers.io
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
    }
    

    ObservableSubscribeOnを見続けます.AbsstractObservableWithUpstreamを継承しています.
    public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
            super(source);// 1.     observable
            this.scheduler = scheduler;// 2.          
        }
    
        @Override
        public void subscribeActual(final Observer super T> s) {
            // 3.   Observer    
            final SubscribeOnObserver parent = new SubscribeOnObserver(s);
            // 4.      observer onSubscribe  
            s.onSubscribe(parent);
            // 5.SubscribeTask   runnable,scheduler          runnable,       Disposable      Disposable
            //             ,run     scheduler   
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer super T> actual;
    
            final AtomicReference s;
    
            SubscribeOnObserver(Observer super T> actual) {
                this.actual = actual;//      observer
                this.s = new AtomicReference();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);//      SubscribeOnObserver,      observer onNext  
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
            ...
        }
    
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver parent;
    
            SubscribeTask(SubscribeOnObserver parent) {
                this.parent = parent;//     observer     
            }
    
            @Override
            public void run() {
                // 6.       scheduler          observer    observable   ,
                //               ,  observer       SubscribeOnObserver,
                //      onSubscribe、onNext    SubscribeOnObserver ,      observer 
                source.subscribe(parent);
            }
        }
    }
    

    ここでは大まかな流れをまとめることができます
  • subscribeOnを呼び出すと、ObserableをObservableSubscribeOn、ObserverをSubscribeOnObserverに装飾する.最初のsubscribeで購読したスレッドでobserverをコールバックするonSubscribeメソッド;
  • parentを呼び出す.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))はrunnable実装スレッド切替を実行し、schedulerスレッドrunメソッドでsourceを呼び出す.subscribe(parent)、ここのparent SubscribeOnObserverは装飾されたobserverで、最終的にobserverを呼び出す方法です.

  • SchedulerのscheduleDirectメソッド
    public abstract class Scheduler {
        ...
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {//        SubscribeTask
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //        IoScheduler,   worker EventLoopWorker
            final Worker w = createWorker();
            // hook  ,    decoratedRun == run
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            // DisposeTask    Runnable, Disposable
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //        ThreadWorker scheduleActual  ,         
            w.schedule(task, delay, unit);
    
            return task;
        }
    
        @NonNull
        public abstract Worker createWorker();//     ,    
        ...
    }
    

    AbstractObservableWithUpstream
    abstract class AbstractObservableWithUpstream extends Observable implements HasUpstreamObservableSource {
    
        /** The source consumable Observable. */
        protected final ObservableSource source;// Observable ObservableSource   ,       Observable
    
        /**
         * Constructs the ObservableSource with the given consumable.
         * @param source the consumable Observable
         */
        AbstractObservableWithUpstream(ObservableSource source) {
            this.source = source;//     ,ObservableSubscribeOn    
        }
    
        @Override
        public final ObservableSource source() {
            return source;
        }
    }
    

    AbsstractObservableWithUpstreamはObservableを継承し、上流のObservableをsourceで保存します.
    2.2 observeOnメソッドを見る
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
    }
    

    ここまで,フローはsubscribeOnとそれほど差がなく,いずれも装飾者モードを採用し,ObservableObserverの装飾に対してそれぞれ中間の被観察者と観察者を生成し,スレッド変換を実現していると推測できる.ObservableObserveOnを見続けます.これもObservableの装飾で、AbstractObservableWithUpstreamも受け継がれています.
    public final class ObservableObserveOn extends AbstractObservableWithUpstream {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);// 1.        observable
            this.scheduler = scheduler;// 2.          
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                // 3.       HandlerScheduler,  HandlerWorker
                Scheduler.Worker w = scheduler.createWorker();
                // 4.    observer,       observable subscribe  
                //   observable  subscribeActual,  ObserveOnObserver onSubscribe、onNext   
                source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
            }
        }
    }
    

    ObserveOnObserverの静的内部クラスObserveOnObserver,上流observableはsubscribeActualメソッドを実行し,ObserveOnObserverのonSubscribe,onNextなどのメソッドを呼び出す.
    static final class ObserveOnObserver extends BasicIntQueueDisposable
    implements Observer, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
        SimpleQueue queue;
        Disposable s;
        Throwable error;
        volatile boolean done;
        volatile boolean cancelled;
        int sourceMode;
        boolean outputFused;
    
        ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;//   observer
            this.worker = worker;//      
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {//    true
                    @SuppressWarnings("unchecked")
                    QueueDisposable qd = (QueueDisposable) s;
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    if (m == QueueDisposable.SYNC) {//             
                        sourceMode = m;
                        queue = qd;
                        done = true;//    true,   onxx    ,  return
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);//     observer onSubscribe
                        return;
                    }
                }
                queue = new SpscLinkedArrayQueue(bufferSize);
                actual.onSubscribe(this);
            }
        }
    
        @Override
        public void onNext(T t) {
            if (done) {//       
                return;
            }
    
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();//      
        }
    
        @Override
        public void onError(Throwable t) {//      
            if (done) {//       
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;//     
            done = true;//    true
            schedule();//     
        }
    
        @Override
        public void onComplete() {
            if (done) {//       
                return;
            }
            done = true;//    true
            schedule();//     
        }
        ...
    }
    

    最終的にscheduleメソッドが呼び出され、スレッドの切り替えが実現されます.
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    ここでは、ObserveOnObserverがRunnableインタフェースを実現したため、runメソッドを参照してください.
    @Override
    public void run() {
        if (outputFused) {//    false
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    drainNormalの方法を見て
    void drainNormal() {
        int missed = 1;
        final SimpleQueue q = queue;
        final Observer super T> a = actual;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();//     
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);//     
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {//         
                    return;
                }
    
                if (empty) {//      
                    break;
                }
    
                a.onNext(v);//        observer
            }
    
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    //        
    boolean checkTerminated(boolean d, boolean empty, Observer super T> a) {
        if (cancelled) {//    
            queue.clear();//     
            return true;
        }
        if (d) {//    done
            Throwable e = error;
            if (delayError) {
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                } else
                if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        return false;
    }
    

    まとめ
  • タスクチェーンを作成し、各ステップは対応するObservableオブジェクトを返します.
  • は逆に段階的に購読し、各ステップは対応するObserverを生成して前のステップで生成したObservableを購読する.
  • タスクチェーンを実行し、タスクチェーンを実行した後、各ステップは対応するObserverに通知し、調整タスクチェーンを完了する.