こわくないReactive Extensions超入門


(この記事は、Reactive Extensionsの素晴らしさを伝えるために書かれたものです)
Linqはふつーに使ってる、Rxは解説記事を読んでみたことはあるけどなるほどわからんだった人あたりを対象にしています。

Reactive Extensionsとかいうやつ

Reactive Extensions!!!(略して、Rx!)
C#のサイキョウライブラリの一つであり、C#に革命をもたらしたLinqをさらに発展させてなんだかめちゃくちゃよく分からないものにしてしまったようなもの。
詳しいことは後で書くので、とりあえずはLinqっぽいすごいやつだと思っておけばいいです。

Rxの解説など

  • Rxは時間軸をシーケンスと見なして関数を適用していく関数指向なライブラリである
  • IEnumerable<T>をベースとするLinqに対して、IObservable<T>をベースとした拡張メソッド群
  • Linqはソースに対してPullする形で値を得るが、RxはソースからのPushで値を送出する
  • Rxは

なにいってるのかぜんぜんわかんねえよ!!

RxとLinqの関係

LinqとRxを対比させる方法は、Linqのことを内部実装までよく把握していて初めて成り立つものです。
具体的には、MoveNext()とCurrentがどうのこうのとか、メソッドチェーンはソースを内包したクラスで表現されて……とかそういう話です。今は忘れてください。

今回は、Linqとは基本的に無関係な方向からのアプローチでRxを解説していきます。

C#のevent

event。いべんと。唐突ですが、C#にはeventというものがあります。
eventというのは、値を処理する関数を値を生成する側に渡して、イベント(何かをトリガーにして起こる事象)が発生すると、その渡した関数が実行される。みたいな処理の仕組みのことです。

[gist] イベントの例とか

eventは、発火時に値を処理するメソッドを先に渡しておいて、イベントを発行する側に実行してもらう仕組みですが、ちょっと見方を変えると、「イベントが発動するとあらかじめ登録した関数に値が飛んできて勝手に実行される」と見ることもできます。
eventというのはそういう形で、値を配送するものです。こういうものをオブザーバーパターンというらしいですよ。
と、ここでRxが突然現れます。

Rx == event

以下の簡単なコードを見てみましょう。

public class Rx01
{
    public void Start()
    {
        var i = 1; // ichi
        Observable.Return(i)
            .Subscribe(イベント受け取るやつ);
    }

    public void イベント受け取るやつ(int value)
    {
        Console.WriteLine($"Eventきたぞ!値は{ value }だぞ!");
    }
}

/*/ 実行結果

Eventきたぞ!値は1だぞ!

/*/

[Observable.Return](値の発行元) -------------> イベント受け取るやつ()

はい。
Observable.Returnってやつは、値を一つ発行するジェネレータです。
Subscribeは、購読です。いわゆるイベントへの値の受け取り登録です。
イベント受け取るやつは、イベント受け取るやつです。

Rxというのは、つまり、イベントです。

シーケンスとかリストとかそういう見方はとりあえず捨てて

Observable.Rangeという、連続した値を生成するジェネレータがあります。
名前からして、リストを作るイメージですね。
Linqで対比して、Enumerable.Rangeがまさにそれです。
でも、Rxはイベントなんです。
こう考えてみましょう。

Observable.Range(0, 10).Subscribe(イベント受け取るやつ);

「イベントに登録したらすぐに0が飛んできたけど、そのまま連続で1が飛んできて、9まで飛んできたところで終わった」

これは、イベントが10回連続で発火しただけだと考えられませんか。
それなら、リストっぽい要素はないです。ただのイベントです。
ここが、LinqとRxの一番の認識の違いです。
RxはLinqと比べるのではなく、eventと比較するべきだったのです。

Rxだからできること

連続で値が来るなら、フィルターしたいよねー

[Range(0, 10)](値の発行元) -------------> イベント受け取るやつ()

たとえば、奇数を発行したイベントは無視して偶数を発行したときだけ処理したかったとします。

[Range(0, 10)](値の発行元) --------[ここで偶数だけ通したい]--> イベント受け取るやつ()

それWhere

Observable.Range(0, 10).Where(x => 0 == x % 2).Subscribe(イベント受け取るやつ);

飛んでくる値を何か変換したいときー

たとえば、最初から100足した値が欲しかった、とか。

[Range(0, 10)](値の発行元) --------[ここで+100する]---> イベント受け取るやつ()

もちろんSelect

Observable.Range(0, 10).Select(x => x + 100).Subscribe(イベント受け取るやつ);

イベントが発火しても、諸事情により10秒待ってから処理したかった

Delay

Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).Subscribe(イベント受け取るやつ);

最初の3回分だけほしい、後はいらない

Take

Observable.Range(0, 10).Take(3).Subscribe(イベント受け取るやつ);

イベントに、Linqのメソッドが使える!!!!

イベントで値が [ソース-------->イベント受け取るやつ()] と配送されてくるとして、その間の経路部分にLinqのメソッドを挟み込むイメージが適切です。

上に上げたのはRxメソッド群のほんの一部ですが、それらもいくらでも組み合わせ、いくらでも連結することが可能です。
さらに、Linqと比べて便利なメソッドが山ほど増えてたり、そもそもLinqでは実現できない時間に関わるメソッドもあったりして、Rxサイキョウ!

もうちょっとくわしいこと

Subscribeの3種類の引数

RxでSubscribeに送られてくる値には3つの種類があります。

一つは、イベントの値そのものが流れてくる OnNext()
一つは、途中で例外が発生したとき、その例外が流れてくる OnError()
一つは、イベントが完了したときに流れてくる OnCompleted()

これらの一部、または全てに、発動したときに処理するメソッドを用意しておくことで、普通のeventよりもとても柔軟なフローを書くことができます。
Subscribeにはたくさんのオーバーロードがあり、これらの好きな組み合わせでイベントを監視できます。

並列

イベントなので、いつ発火するか分からないソースがあったとします。
そんなイベントを複数購読したかったりしますね。
並列です。
内部ではスレッドプールやタスクがきっちり管理されているので、
何となく使ってれば勝手に並列に動作していたりします。
慣れるまでは意図した挙動にならなかったときの対応が難しいんですが。
大事なのは、Subscribeさえしておけば、値が飛んできたとき勝手にそのOnNextが発動するということ。

非同期

event自体がC#の中でも非同期を扱うのに適していたりします。
完了待ちなども、完了したタイミングで登録されているメソッドに値を送出するだけです。
Rxももちろん同じ理由で非同期フレンドリィです。
async/awaitが登場する前から、非同期を自然に、強力に扱うことが出来たらしいです。
今では非同期の処理自体はTaskにお任せですが、その結果をRxに持ち込むように連携すると強烈なことになります。

時間

Rxはイベントなので、時間との相性が非常に良いです。
一定時間だけイベントを購読する、値を一つ受け取ったら一定時間は無視したい、一定時間値の送出を遅らせたい、など。
これはLinqには絶対に出来ないRxの特権でもあります。

合成

通常のeventではできないこと。
複数のeventから発行される値を合成してまるで別のイベントソースのように変換してしまうことが可能です。
Rxの例としてよく見かけるマウスのドラッグ処理とかは、1回だけ発生したボタン押下イベントを連続で発生するマウス移動イベントに差し替えて変換してしまってるんですね。

分配

一つのソースから値が送出されたとき、途中から二つ以上の監視者に同じ値を分配することができます。
これは地味にLinqでは実現できないことなんですよね。
途中で分配して処理してからまた一つに合成、なんていうアクロバティックなことも普通にできてしまいます。

キャンセル

時間が関わるということは、長い時間を掛けて監視するイベントソースももちろんあるわけです。
途中でキャンセルしたくなったら?
Subscribeの戻り値がIDisposableなので、これのDispose()を呼び出すことで即座に監視を解除することができます。
監視のキャンセルを実行すれば、長いメソッドチェーン中で実行中の処理があったとしても全てキャンセルしてくれます。

例外

たとえ処理がマルチスレッドで並列実行していたとしても、発生した例外はちゃんとOnError()に流れてくるので、例外処理が非常にやりやすいのがメリット。
また、OnError()がSubscribeに流れ着いたとき、自身のチェーンに対してきっちりDispose()を呼ぶ。後始末もちゃんとやるRx、えらい。

連携

async/await(Task)との連携がしっかり組まれていて、非同期処理をチェーンに組み込んだり、実行に時間がかかる変換処理をスレッドプールに放り込んで並列処理とかがすっごいナチュラルに書けます。
特に、Observable.Create()(自分で値の生成処理を書くことができるジェネレータ)とasyncラムダの組み合わせは、ヤバいです。
また、C#ネイティブのeventも、何も考えずにRxのソースにすることができます。わずらわしいeventの登録解除処理とかも全部Rx側でやってくれるので、使う側は好きなようにメソッドチェーンを繋いで、Subscribeするだけ!

やっぱりまとまらない

考えをまとめる前に文章に吐き出しすぎてカオス!
とりあえずまとめてみる

  • Rxは超スゴイイベント機構
  • イベントだから並列非同期時間管理もオテノモノ
  • そしてLinqだから好き勝手メソッドチェーンポコポコ
  • チェーンを引き回しても例外はきっちり取れるから安心DA
  • 値の差し替え、合成、分配、なんでもできる
  • async/awaitと組み合わせればもはやムテキ!カオス!

いかがでしたか。これでもやっぱりよく分からないでしょうから、とりあえず触ってみましょう!大丈夫、こわくない、こわくない。
たとえ複数の値が流れていても、そのうちの一つの値にのみ目を向けて流れを確認してみるのがコツです。
Rxの魅力とそのパワーを知る人が一人でも増えますように!