RxのSchedulerの話 / Rxのなかのこととか


Rxをわりとごりごり使ってる人向けの記事です?部分的に初心者にやさしい。かも。
あんまり知ってなくてもいい内容な上に、うまく言葉にできてる気がしないし分かりづらいです。すいませんすいません。

Scheduler!!!

Reactive Extensionsを構成する重要な要素の一つにスケジューラがあります。今日はスケジューラのことについて書いていきます。
最近HOTなReactive ExtensionsのReactiveじゃないとこをあえて解説していくスタイル!

スケジューラとは?

その名の通り、プログラムの進行をスケジューリングしていくための機構です。主には、マルチスレッドで同時多発的に呼び出された複数の操作をキューに突っ込んで同時実行制御などを行って処理する仕組みとか。そんな感じの。

Rxとスケジューラ

実はRxは全ての操作がスケジューラに乗って実行されるように作られています。つまり、Rxは「Observerパターン(イベント) + Linq + スケジューラ」の3つの要素で出来ているフレームワークだと言えます。

Rxで主に使用されるスケジューラ

Rxでは主に3種類のスケジューラが使用されています。さらに派生のスケジューラなども用意されていますが、今回はメインの3種類について書いていきます。

CurrentThreadScheduler

そのまんま、カレントスレッド上で全ての処理を行っていくスケジューラ。コンカレントなキューを内部に持っていて、全ての操作は一度このキューに入れられてから一つずつ取り出して処理されます。また、このスケジューラはSubscribeが使用しているので、どんな処理を書いても必ずこのスケジューラを通ります。

DefaultScheduler

デフォルトってなんだデフォルトって。って思うんですが、これはバックグラウンドスレッドをスケジューラの形に仕上げたものです。内部に一つのスレッドかスレッドプールを用意して、各操作を処理していくものです。連続する操作は一つのバックグラウンドスレッドでループ処理するように作られています。なので、スレッドプールをフル活用して操作を並列処理していくスケジューラじゃないですよ!

ImmediateScheduler

これは、操作のスケジューリングが行われた瞬間に割り込みで処理を走らせるスケジューラです。内部に軽量なローカルスケジューラを持ち、最初のスケジューリングが行われた後は、全ての操作をローカルのキューを通して処理してから制御を戻します。なので、超はやいです。

その他のスケジューラ

ThreadPoolScheduler

スレッドプールで操作を行うスケジューラ。というと、DefaultSchedulerとの違いはなんなんだ?って思うんですが、なんなんでしょうね?ぼくもわかりませんでした。誰か教えてください。

TaskPoolScheduler

操作をTask上で処理していくスケジューラ。操作が完全にTask.Run上で実行されたときのそれになるんですが、いまいち使い方やメリットがわかってないです。

スケジューラの使い方

Observableジェネレータメソッド

ジェネレータ(値を生成するもの。基本的にソースとなるIObservableを持たず、自身がチェーンの始点になれるメソッド)は、それぞれが最も適切なスケジューラを暗黙的に使用します。また、各ジェネレータはユーザーがスケジューラを指定できるオーバーロードを持っています。詳しくはインテリセンス先生に聞いてください。暗黙指定のスケジューラは、時間関係のもの(Timer、Intervalなど)がDefaultScheduler、単一の値を発行するもの(Returnとか)はImmediateScheduler、その他ふつーなもの(Rangeなど)がCurrentThreadSchedulerとなっています。

Delay()

中継メソッドであるDelay()は、一定時間値の送出を遅延させるものですが、これは内部でDefaultSchedulerにリスケジュールしています。スケジューラを受け取るオーバーロードからCurrentThreadSchedulerを明示的に指定することで、実行中スレッドをブロックしながら遅延した値を流すことも可能です。

ObserveOn()

まさにスケジューラを変更するためだけに存在する中継メソッドです。引数で指定したスケジューラにリスケジュールすることができます。Delay()と同様に、バックグラウンドなスレッドにリスケジュールした場合、その時点で制御をジェネレータに返すため、値がSubscribeに到達する前にジェネレータは次の値を発行するようになります。

スケジューラとメソッドチェーン

カレントスレッド以外を使用するスケジューラは、一つのSubscribeに対して一つのスレッドを割り当てます。なので、たとえば中継に.ObserveOn(DefaultScheduler.Instance)を挟んでいるチェーンを二か所からSubscribeした場合、ObserveOnを境に処理スレッドが別れるブランチ形状の経路が形成されます。これを利用することで、チェーンの中で処理を並列化することも可能となります。

スケジューラのことまとめ

スケジューラを使用する意味

もちろん、非同期並列処理の安全かつ簡単な管理!これに尽きる!Reactive ExtensionsがReactiveであるために必要なので実装された。ただそれだけなのです。Rxのスケジューラは実はIObservableを通さなくてもそれ単体で使用できるので、触ってみると面白いですよ。

スケジューラのデメリット

遅い。単純なループ処理をLinqと比較しても、何十倍の速度差があります。こればっかりはどうしようもないです。ジェネレータにImmediateSchedulerを割り当てることで大分速度を改善することができますが、オススメしません。というか、絶対にやらないでくださいね。

Rxの実装 --- Rxのチェーン構築の仕組み

今度は、Rxのチェーン構築の仕組みについて書いていきます。これはRxがLinqであるという意味を解説するものであり、Linqとも大部分が共通する内容となります。知らないとわりと黒魔術的に見えるRxのチェーン構築ギミックですが、知ってみるとわりと面白いものです。

Step1: Rxメソッドの連結

Rxの動作は大きく分けて3ステップで構成されています。まずは、IObservableのメソッドチェーンを構築する段階です。この段階で何が起きているのかというと、各メソッドはそれぞれIObservableを実装したクラスのインスタンスを生成します。そして、チェーンの親(source)への参照を内部に持ちます。Linqと同じ仕組みですね。これをクラス内包と表現している解説もありますが、実際は、親へとさかのぼる片方向連結リストという表現が正しいでしょう。

[Range(0, 10)]  <------(source)[Where()]  <------(source)[Select()]

Step2: Observerの実体化と連結、そしてIDisposableの集約

Subscribeが実行される時に発生します。Subscribeからジェネレータに向かって、順番に各ObservableクラスがObserverを生成します。そして、各Observableクラスはsourceに自身のObserverを渡し、再帰的にObserverのチェーンが生成されます。

[Range(0, 10)] <---- [Where()] <--------- [Select()] <----- [Subscribe()]
     ↓                  ↓                    ↓                   ↓
  Generator       [WhereObserver]      [SelectObserver]      (observer)

Observerを生成していく再帰的なActivationは、戻り値でIDisposableを返します。このIDisposableはOnCompletedやOnErrorが発生したときに実行される、自身の生成したObserverを破棄するものとなります。これらは自身のIDisposableとCombineしながらスタックを解放してSubscribeまで戻っていきます。このIDisposableがユーザーが受け取るSubscribeの戻り値になります。なので、長いチェーンを購読したときのIDisposableの中身は実は非常に巨大なナニカになっています。

(IDisposable) --> (+ IDisposable) --> (+ IDisposable) --> (+ IDisposable) --> 戻り値
[Range(0, 10)]        [Where()]          [Select()]        [Subscribe()]

  Generator        [WhereObserver]     [SelectObserver]     (observer)

Step3: 値の送出

Step2でObserverのチェーンが生成されたので、これらのOnNextに対して値をPushしていきます。このOnNext処理中に各Observerが変換処理を行ったり値をフィルターしたりします。また、ここでOnNext処理中に例外が発生した場合、Observerは値の送出を止めてOnErrorに例外オブジェクトを送ります。

[Range(0, 10)]         [Where()]            [Select()]        [Subscribe()]

  Generator   ----> [WhereObserver] ---> [SelectObserver] ---> (observer)
    Value              Filtering              Select             OnNext!

Rxは自身の持つObserverのOnNextに値を渡していくので、「RxはPush型の値送出機構」という表現は、つまりそういうことです。

RxのIDisposable

上でも少し触れましたが、Subscribeの返すIDisposableはユーザー視点で見ると「監視のキャンセル」ですが、実装的には「チェーンを構成するObserverの再帰的な破棄」を行っているものです。値の送出が完了(OnCompleted)したとき、例外が発生した(OnError)とき、そして、ユーザー自身が購読のキャンセルを実施した(Dispose)ときのどれかで1度だけ実行されます。なので、たとえばCurrentThreadSchedulerなどでOnCompleted()まで一気に値が流れるIObservable(Rangeなどがそれ)をSubscribeしたときの戻り値は、「既にOnCompleted()によってDisposeされているIDisposable」となるので、これを変数に保持する意味はまるでありません。

まとめ

今回は別に知ってなくてもいいことしかしゃべってません。
一応まとめ

  • Rxは「Observerパターン + Linq + スケジューラ」
  • 全てのIObservableはスケジューラを使用している
  • スケジューラはマルチスレッドをRx的に管理するために必要な仕組み
  • RxとLinqは実装的にはほとんどいっしょ
  • RxのIDisposableは自身のObserverチェーンを破壊するもの
  • Subscribeの戻り値のDisposeはだいたいは勝手に実行されるので明示的に購読をキャンセルしたいとき以外はほっといてもいい

以上です。ありがとうございました。