RxJava:スレッド切り替え

11881 ワード

下一篇:RxJava:基本購読プロセス
Rxjavaで最も一般的な2つの方法:
  • subscribeOn(Scheduler scheduler)
  • observeOn(Scheduler scheduler)

  • 通常、我々はsubscribeOn(Schedulers.io())メソッドを使用してIOスレッドでの購読を指定---データ処理を行い、observeOn(AndroidSchedulers.mainThread())メソッドはメインスレッドでの観察を指定---UI操作を行う.
    so,Rxjavaはどのようにスレッド切替を行いますか?
    Observable.subscribeOn(Scheduler scheduler)
        public final Observable subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));//1
        }
    

    subscribeOn()メソッドは、Observable.create()と同様に、Observableオブジェクトが返され、その内部には主にObservableSubscribeOn(ObservableSource source, Scheduler scheduler)メソッドが呼び出され、ObservableSubscribeOnオブジェクトが構築されていることがわかります.ObservableSubscribeOnクラスは長くありません.次のようにすべてのコードを貼ります.
    public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {//2
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer super T> s) {
            final SubscribeOnObserver parent = new SubscribeOnObserver(s);
    
            s.onSubscribe(parent);
            //Disposable       ,            
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {//3
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    
        static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer super T> actual;
    
            final AtomicReference s;
    
            SubscribeOnObserver(Observer super T> actual) {
                this.actual = actual;
                this.s = new AtomicReference();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(s);
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    }
    

    ObservableSubscribeOnクラス継承関係は次のとおりです.
    ObservableSubscribeOn extends AbstractObservableWithUpstream
    
    bstractObservableWithUpstream extends Observable
    

    ObservableSubscribeOnクラスもObservableであり、名前からも分かるように、SubscribeOnが行われたObservableである.では、彼と私たちが前に述べたObservableCreate類の最大の違いはどこですか.
    Observable実装クラスのキーメソッドを貼り付けます:ObservableCreate.subscribeActual(Observer super T> observer)、比較します:
        protected void subscribeActual(Observer super T> observer) {
            CreateEmitter parent = new CreateEmitter(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
  • ObservableCreate内部にエミッタクラスが定義されている---CreateEmitter,observer.onSubscribe()メソッドはCreateEmitterエミッタオブジェクトに渡される.
  • ObservableSubscribeOn内部にはSubscribeOnObserverオブジェクトが定義されており、subscribeActualメソッドでは、s.onSubscribe()がこのSubscribeOnObserverオブジェクトに渡される.つまり、Observerを再包装し、Runnableに入れて実行します.
    SubscribeOnObserverもObserverで、私たちが伝えたObserveableを包装しました

  • ObservableCreate.subscribeActualで、実行したsource.subscribe(parent)送信機を使用して我々が定義したsubscribe(ObservableEmitter e)メソッドを実行する.
  • ObservableSubscribeOn.subscribeActualで、呼び出されたのはscheduler.scheduleDirect(Ruunable run)で、Runnableが1つ入ってきて、実行source.subscribe(parent).
    送信機によって送信されたデータは、subscribe()によって呼び出され、subscribe()が実際に呼び出されたのは、Observable実装クラスObservableSubscribeOn.subscribeActual()である.


  • 主に重要な論理ステップ:scheduler.scheduleDirect()メソッドを見て、Runnableが指定したスレッドでどのように実行されているかを理解します.
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    

    内部的にWorker.schedule()メソッドを呼び出して、私たちが入力したRunnableを実際に実行していることがわかります.createWorker()は、我々が入力したSchedulerの実装クラスによって決定される、すなわち、我々が呼び出したsubscribeOn(Scheduler scheduler)に入力したSchedulerを具体的に実装する抽象的な方法である.上記のコードでは、この参照チェーンを注釈//1//2//3で表記しました.
    私たちはSchedulers.io()を例に、その実現を具体的に見てみましょう.まず見てみましょうSchedulers.io()代表のScheduler:
        public static Scheduler io() {
            return RxJavaPlugins.onIoScheduler(IO);
        }
        static {
        ......
            IO = RxJavaPlugins.initIoScheduler(new Callable() {
                    @Override
                    public Scheduler call() throws Exception {
                        return IoHolder.DEFAULT;
                    }
                });
        ......
        }
        static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
    Schedulers.io()は、Schedulerオブジェクトを返します.IoSchedulerクラスでは、createWorker()がEventLoopWorkerオブジェクトを返します.EventLoopWorker.scheduleメソッドは次のとおりです.
            public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    

    scheduleActualメソッドでは、Runnableを一連のパッケージ化し、最終的にはAndroidのExecutorService実装クラスである-S cheduledExecutorServiceを使用して実行します.
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable)sr);
            } else {
                f = executor.schedule((Callable)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    このexecutorは次のように構成されています.
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
        public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            if (exec instanceof ScheduledThreadPoolExecutor) {
                ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
                POOLS.put(e, exec);
            }
            return exec;
        }
    

    スレッドプールからExecutorServiceを取得し、このExecutorServiceを使用して、私たちが転送したRunnableを実行します.以上をまとめると、Observable.observeOn(Schedulers.io())サブスレッドを切り替える原理は、ObservableOnSubscribeでカスタマイズされたsubscribe()と、実際に読み込まれたScheduler実装クラスのWorker.schedule()メソッド指定スレッドで実行することにある.
    Observable. observeOn(Scheduler scheduler)
        public final Observable observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
    
    public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
    }
    

    observeOn()はfinalメソッドであるため、すべてのObserveableは呼び出されたメソッドであり、実際にはObservableObserveOnオブジェクトインスタンスを返します.スレッドを切り替えた後にsubscribe()メソッドを使用してサブスクリプションオブザーバを定義してui操作を行うが、前編ではObserveable.subscribeが実際に実行しているのはObservable実装クラスのsubscribeActualメソッドであるため、注目すべきはObservbleObserveOn.subscribeActual()である.ObservableObserveOn.subscribeActual()コードは以下の通り:
    protected void subscribeActual(Observer super T> observer) {
        //     scheduler      ,       ,     observer subscribe()  ,         //source    ObservableSource(Observable)
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
        }
    }
    

    ここで注目するのはelseにおけるスレッド切替プロセスであり,subscribeOn()メソッドによるスレッド切替と同様に,入力されたSchedulerによってWorkerが決定されることである.ここではObserveOnObserverクラスを導入し,受信したobserverをカプセル化し,すなわちobserverの実行スレッドを決定する.
    ObserveOnObserverクラスはRunnableインタフェースを実現し、runメソッドは以下の通りである.
    //ObserveOnObserver   run  ,drainFused drainNormal    Observer     。
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    ObserveOnObserverのworkerは主にschedule()メソッドに作用し、schedule()次のようにworker.schedule()メソッドを呼び出し、現在のObserveOnObserverを実行する.
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    So,observeOn法もSchedulerによりスレッド切替を実現している.AndroidSchedulers.mainThread()代表的なSchedulerは以下の通り.
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    

    schedle()メソッドでは,handlerを介して受信したRunnableをプライマリスレッドに送信して実行することを示した.
    HandlerWorker(Handler handler) {
        this.handler = handler;
    }
    
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
    
        if (disposed) {
            return Disposables.disposed();
        }
    
        run = RxJavaPlugins.onSchedule(run);
        //    Runnable handler     ScheduledRunnable,  Message Callback
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        //  Handler     MainLooper
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
    
        return scheduled;
    }