subscribeOnとobserveOnの違い


Rxで、処理の実行スケジューラーを切り替えるオペレーターとして、 subscribeOnobserveOn があります。
この2つは雰囲気がよく似ているので、ドキュメントを読んだ時点では今ひとつ違いが理解できなかったのですが、コードを書いて実行することで、その差が明確になりました。

どちらも使わない場合

次のコードを実行してみます。

const observable1 = Rx.Observable.create(observer => {
  console.log("[A] before onNext(1)");      // (2)
  observer.onNext(1);
  console.log("[A] after onNext(1)");       // (4)

  console.log("[B] before onNext(2)");      // (5)
  observer.onNext(2);
  console.log("[B] after onNext(2)");       // (7)

  console.log("[C] before onCompleted()");  // (8)
  observer.onCompleted();
  console.log("[C] after onCompleted()");   // (10)
})

const observable2 = observable1

console.log("[D] before subscribe()");      // (1)
observable2
  .subscribe(value => {
    console.log("[E] next: " + value);      // (3), (6)
  }, error => {
    console.log("[E] error: " + error);
  }, () => {
    console.log("[E] completed");           // (9)
  })
console.log("[D] after subscribe()");       // (11)

その結果、コンソールにはこう出力されます。

[D] before subscribe()
[A] before onNext(1)
[E] next: 1
[A] after onNext(1)
[B] before onNext(2)
[E] next: 2
[B] after onNext(2)
[C] before onCompleted()
[E] completed
[C] after onCompleted()
[D] after subscribe()

Rx.Observable.create に渡すのはsubscribeされたときに実行する処理です。 observable2.subscribe の呼び出しの中から、この処理が呼ばれていることがわかります。また、その処理でobserverの onNextonCompleted を呼び出すとその中から、 observable2.subscribe に渡した関数が呼ばれていることもわかります。

subscribeOnを使った場合

subscribeする前に subscribeOn を使ってみます。
なお、引数に渡している Rx.Scheduler.async は非同期、要するに「次のタイミング」で実行してくれるスケジューラーです。

const observable1 = Rx.Observable.create(observer => {
  console.log("[A] before onNext(1)");      // (3)
  observer.onNext(1);
  console.log("[A] after onNext(1)");       // (5)

  console.log("[B] before onNext(2)");      // (6)
  observer.onNext(2);
  console.log("[B] after onNext(2)");       // (8)

  console.log("[C] before onCompleted()");  // (9)
  observer.onCompleted();
  console.log("[C] after onCompleted()");   // (11)
})

const observable2 = observable1
  .subscribeOn(Rx.Scheduler.async)

console.log("[D] before subscribe()");      // (1)
observable2
  .subscribe(value => {
    console.log("[E] next: " + value);      // (4), (7)
  }, error => {
    console.log("[E] error: " + error);
  }, () => {
    console.log("[E] completed");           // (10)
  })
console.log("[D] after subscribe()");       // (2)

結果のログはこうなります。

[D] before subscribe()
[D] after subscribe()
[A] before onNext(1)
[E] next: 1
[A] after onNext(1)
[B] before onNext(2)
[E] next: 2
[B] after onNext(2)
[C] before onCompleted()
[E] completed
[C] after onCompleted()

subscribeOn を入れたことで、 subscribe の呼び出し自体はすぐに帰ってきて、その「次のタイミング」で Rx.Observable.create に渡した処理が実行されています。
結果的に、observable2.subscribe に渡した関数も同じタイミングで呼ばれていますが、これは Rx.Observable.create に渡した処理の中で単純に onNext などを呼び出しているからでしょう。 onNext の呼び出しを別のタイミングで行うようにしていれば、そうはならないはずです。

つまり、subscribeOn は、subscribeされたときに実行する処理のスケジューラーを変更するということですね。

observeOnを使った場合

では、 subscribeOn の代わりに observeOn を使ってます。

const observable1 = Rx.Observable.create(observer => {
  console.log("[A] before onNext(1)");      // (2)
  observer.onNext(1);
  console.log("[A] after onNext(1)");       // (3)

  console.log("[B] before onNext(2)");      // (4)
  observer.onNext(2);
  console.log("[B] after onNext(2)");       // (5)

  console.log("[C] before onCompleted()");  // (6)
  observer.onCompleted();
  console.log("[C] after onCompleted()");   // (7)
})

const observable2 = observable1
  .observeOn(Rx.Scheduler.async)

console.log("[D] before subscribe()");      // (1)
observable2
  .subscribe(value => {
    console.log("[E] next: " + value);      // (9), (10)
  }, error => {
    console.log("[E] error: " + error);
  }, () => {
    console.log("[E] completed");           // (11)
  })
console.log("[D] after subscribe()");       // (8)

結果のログはこうです。

[D] before subscribe()
[A] before onNext(1)
[A] after onNext(1)
[B] before onNext(2)
[B] after onNext(2)
[C] before onCompleted()
[C] after onCompleted()
[D] after subscribe()
[E] next: 1
[E] next: 2
[E] completed

今度は、subscribeされたときに実行する処理は observable2.subscribe の中から呼び出されていますが、 onNextonCompleted はすぐに帰ってきていて、 observable2.subscribe に渡した関数が「次のタイミング」で実行されています。

つまり、 observeOnonNext などのイベントが発生したあとの処理(というか、 observeOn を挟んだ以降の処理)を指定したスケジューラーで実行するわけですね。

まとめ

subscribeOn

  • subscribe時の処理、つまり、イベントを発生させる処理の実行スケジューラーを指定する。
  • subscribeOn を使っても、そこで指定したスケジューラーで subscribe に渡した関数が実行されるかどうかはわからない。(それを指定するものではない)

observeOn

  • それ以降のイベントの伝播処理(オペレーターの処理や、 subscribe に渡した関数の処理)を実行するスケジューラーを指定する。
  • observeOn を使っても、イベントを発生させる処理自体の実行スケジューラーを変えることはできない。