RxJava同時スレッドスケジューリング

26705 ワード

Rxターゲットは非同期システムで使用され、Rxはマルチスレッド処理をサポートするため、多くのRx開発者はデフォルトでRxがマルチスレッドであると考えている.実際にはそうではありませんが、Rxのデフォルトは単一スレッドです.
スレッドを明確に指定しない限り、onNext/onError/onCompletedおよび各操作関数の呼び出しはすべて同じスレッドで完了します.たとえば、次の例があります.
final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
    System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});

int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
    synchronized(i) {
        System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
        subject.onNext(i[0]++);
    }
};

r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();

結果:
onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12

上の3つのスレッドでsubjectのonNext関数をそれぞれ呼び出します.Runnableのスレッドと同じスレッドです.複数の操作関数を直列に呼び出すことなく、結果は同じスレッドになります.

subscribeOnとobserveOn


subscribeOnおよびobserveOnは、subscriptionの呼び出しスレッドおよびイベント通知(ObserverのonNext/onError/onCompleted関数)を受信するスレッドをそれぞれ制御するために使用される.
public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)

Rxではスレッドと直接付き合うのではなく、Schedulerでマルチスレッドを処理します.

subscribeOn


subscribeOnはObservableを指定するために使用する.createのコードはそのSchedulerで実行されます.create関数を呼び出さなくても、内部にはcreate実装があります.例:
System.out.println("Main: " + Thread.currentThread().getId());

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    //.subscribeOn(Schedulers.newThread())
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });

System.out.println("Finished main: " + Thread.currentThread().getId());

結果:
Main: 1
Created on 1
Received 1 on 1
Received 2 on 1
Finished main: 1

上記のコードは同じスレッドで実行され、シーケンスで実行されていることがわかります.subscribeの実行が完了すると(create関数のLambda式のコードを含む)後のコードが実行されます.
上の注釈を消したコードをsubscribeOn(Schedulers.newThread()をオンにします.この結果は次のようになります.
Main: 1
Finished main: 1
Created on 11
Received 1 on 11
Received 2 on 11

これでLambda式のコードはSchedulersになります.newThread()が返すスレッドで実行します.subscribeはもうブロックではありません.次のコードは、subscribeの戻りを待つことなく、すぐに実行できます.
一部のObservable内部では、独自に作成したスレッドが使用されます.例えばObservable.intervalは非同期です.この場合、新しいスレッドを指定する必要はありません.
System.out.println("Main: " + Thread.currentThread().getId());

Observable.interval(100, TimeUnit.MILLISECONDS)
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });

System.out.println("Finished main: " + Thread.currentThread().getId());

結果:
Main: 1
Finished main: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11

observeOn


ObserveOnは、データストリームの他端を制御します.あなたのobserverはどのようにイベントを受け取りますか.すなわち,そのスレッドでobserverのonNext/onError/onCompleted関数をコールバックする.
Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .observeOn(Schedulers.newThread())
    .subscribe(i ->
        System.out.println("Received " + i + " on " + Thread.currentThread().getId()));

結果:
Created on 1
Received 1 on 13
Received 2 on 13

observeOnは、この関数を呼び出した後の操作関数にのみ影響します.observeOnはデータストリームをブロックし、後続の操作に役立つと考えられます.例:
Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .doOnNext(i -> 
        System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
    .observeOn(Schedulers.newThread())
    .doOnNext(i -> 
        System.out.println("After " + i + " on " + Thread.currentThread().getId()))
    .subscribe();

結果:
Created on 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13

observeOnに遭遇する前に、すべての操作が1つのスレッドで発生し、その後別のスレッドで発生することがわかります.これにより、Rxデータストリーム内の異なる場所に異なるスレッドを設定することができる.
データ・ストリームの処理に時間がかかることを知っている場合は、この操作により、生産者スレッドのブロックを回避できます.例えばAndroid開発中のUIスレッドでは、そのスレッドでファイルを読み取るとUIカード死(ANR)に応答しない可能性があり、この関数で別のスレッドで実行するように指定できます.

unsubscribeOn


一部のObservableは、Observableが完了すると、これらのリソースを解放するリソースに依存します.リソースを解放するのに時間がかかる場合は、unsubscribeOnでリソースコードを解放して実行するスレッドを指定できます.
Observable<Object> source = Observable.using(
    () -> {
        System.out.println("Subscribed on " + Thread.currentThread().getId());
        return Arrays.asList(1,2);
    },
    (ints) -> {
        System.out.println("Producing on " + Thread.currentThread().getId());
        return Observable.from(ints);
    },
    (ints) -> {
        System.out.println("Unubscribed on " + Thread.currentThread().getId());
    }
);

source
    .unsubscribeOn(Schedulers.newThread())
    .subscribe(System.out::println);

結果:
Subscribed on 1
Producing on 1
1
2
Unubscribed on 11

Schedulers


observeOnとsubscribeOnのパラメータはSchedulerオブジェクトです.Schedulerは、タスクの実行を調整するために使用されます.RxJavaには一般的なSchedulerが含まれており、Schedulerをカスタマイズすることもできます.Schedulersのファクトリ関数を呼び出すことで標準の事前定義されたSchedulerを取得する.
RxJavaに内蔵されているSchedulerには、次のものがあります.
  • immediate同期実行
  • trampolineタスクを現在のスレッドのキューに配置し、現在のタスクの実行が完了したら、キュー内のタスク
  • を実行し続けます.
  • newThreadは、タスクごとに
  • を実行する新しいスレッドを作成する.
  • computation計算スレッド、大量のCPU計算を必要とするタスク
  • io ioオペレーションを実行するためのタスク
  • testテストおよびデバッグ用
  • 現在のcomputationとioの実装は似ています.彼ら2つは主に呼び出しのシーンを確保するために使用されています.ドキュメントの説明に相当し、現在のタスクがどのようなタイプであるかを示します.
    ほとんどのRx操作関数の内部にはschedulersが使用されている.また,Observable操作関数の大部分にもSchedulerパラメータを用いたリロード関数がある.この操作関数が実行するスレッドは、リロード関数によって指定できます.

    schedulerの高度な特性


    Rx schedulerの使用シーンはRxに限定されず、通常のJavaコードでも使用可能です.

    タスクの実行


    Schedulerには、実行可能なタスク(Scheduler.Worker)を作成するcreateWorker関数があります.次に、タスクをスケジュールできます.
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(
        () -> System.out.println("Action"));

    上記のタスクは、指定したスレッドに割り当てられます.
    タスクを繰り返したり、1回だけ実行したり、タスクの実行を遅らせたりすることもできます.
    Subscription schedule(
        Action0 action,
        long delayTime,
        java.util.concurrent.TimeUnit unit)
    Subscription schedulePeriodically(
        Action0 action,
        long initialDelay,
        long period,
        java.util.concurrent.TimeUnit unit)
    Scheduler scheduler = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(
        () -> System.out.println(System.currentTimeMillis()-start),
        5, TimeUnit.SECONDS);
    worker.schedule(
        () -> System.out.println(System.currentTimeMillis()-start),
        5, TimeUnit.SECONDS);

    結果:
    5033
    5035

    上記の例では、実行を遅らせるのはスケジューリング開始時から時間を計算することを示しています.

    タスクのキャンセル


    Scheduler.WorkerはSubscriptionに継承されます.unsubscribe関数を呼び出すと、キュー内のタスクをキャンセルできます.
    Scheduler scheduler = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(
        () -> {
            System.out.println(System.currentTimeMillis()-start);
            worker.unsubscribe();
        },
        5, TimeUnit.SECONDS);
    worker.schedule(
        () -> System.out.println(System.currentTimeMillis()-start),
        5, TimeUnit.SECONDS);

    結果:
    5032

    最初のタスクでunsubscribeが呼び出され、2番目のタスクがキャンセルされます.次の例では、タスクが完了せずにキャンセルされた場合、InterruptedException例外が放出されます.
    Scheduler scheduler = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> { try { Thread.sleep(2000); System.out.println("Action completed"); } catch (InterruptedException e) { System.out.println("Action interrupted"); } }); Thread.sleep(500); worker.unsubscribe();

    結果:
    Action interrupted

    scheduleはSubscriptionオブジェクトを返します.このオブジェクトでキャンセル操作を呼び出すことができます.これにより、すべてのタスクをキャンセルするのではなく、このタスクのみをキャンセルすることができます.

    RxJavaに既存のscheduler


    ImmediateScheduler


    ImmediateSchedulerはスレッドスケジューリングをしていません.同期した実行タスクのみです.ネストされた呼び出しにより、タスクが再帰的に実行されます.
    Scheduler scheduler = Schedulers.immediate();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        System.out.println("Start");
        worker.schedule(() -> System.out.println("Inner"));
        System.out.println("End");
    });

    結果:
    Start Inner End

    TrampolineScheduler


    TrampolineSchedulerも同期して実行されますが、タスクはネストされません.後のタスクをタスクキューに追加し、前のタスクが完了してから後のタスクを実行します.
    Scheduler scheduler = Schedulers.trampoline();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        System.out.println("Start");
        worker.schedule(() -> System.out.println("Inner"));
        System.out.println("End");
    });

    結果:
    Start End Inner

    TrampolineSchedulerは、タスクを最初のタスクを実行するスレッドに配置して実行します.これにより、キューが実行されるまでscheduleを最初に呼び出す操作はブロックされます.後続のタスクは、このスレッドで1つずつ実行され、後続の呼び出しがブロックされません.

    NewThreadScheduler


    NewThreadSchedulerは、各タスクに新しいスレッドを作成します.スレッド情報を印刷する補助関数を定義します.
    public static void printThread(String message) {
        System.out.println(message + " on " + Thread.currentThread().getId());
    }

    例:
    printThread("Main");
    Scheduler scheduler = Schedulers.newThread();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> { printThread("Start"); worker.schedule(() -> printThread("Inner")); printThread("End"); }); Thread.sleep(500); worker.schedule(() -> printThread("Again"));

    結果:
    Main on 1
    Start on 11
    End on 11
    Inner on 11
    Again on 11

    本文は雲が千峰にあるからだ.http://blog.chengyunfeng.com/?p=978