RxのHotとColdについて


UniRxについての記事のまとめはこちら


RxのIObservable<T>にはHot/Coldという大きな2つの特徴があります。
これら性質を理解しないままストリームを設計すると、意図した動作をしてくれない場合があります。

今回はこのHot/Coldの性質について簡単にまとめたいと思います。

概要

一言で言うと?

  • Cold : ストリームの前後をつなぐだけのパイプ。単体では意味が無い。だいたいのオペレータはこっち。
  • Hot : ストリームから値を発行し続ける蛇口。常に垂れ流し。後ろにパイプがたくさん接続できる。

細かく説明すると

Cold Observable

  • 自発的に何もしない受動的なObservable
  • Observerが登録されて(Subscribeされて)初めて仕事を始める
  • ストリームの前後をただつなぐだけ。ストリームを枝分かれさせる機能は無い。

Hot Observable

  • 自分から値を発行する能動的なObservable
  • 後続のObserverの存在に関係なしにメッセージを発行する
  • 自分より上流のCold Observableを起動し、値の発行を要求する機能を持つ
  • 下流のObserverを全て束ね、まとめて同じ値を発行する(ストリームを枝分かれさせる)

HotとColdの見分け方

殆どのオペレータはColdな性質であり、自分で明示的にストリームをHot変換しない限りはColdのままとなります。

Hot変換用オペレータは、いわゆるPublish系のオペレータが該当します。

Hotについて

Hot Observableの性質

ストリームを稼働させる性質

Rxのストリームは基本的にSubscribeされた瞬間に各オペレータの動作が始まるようになっています。
ですがHot Observableをストリームの途中に挟むことで、Subscribeを実行するより前にストリームを稼働させることができます。

ストリームを分岐する性質

Hot Observableはストリームを分岐することができます。

Coldについて

Subscribeされるまで動作しない性質

Cold Observableは、Subscribeされる(またはHot変換される)まで動作しないやる気のないObservableです。
稼働していないCold Observableに渡されたメッセージは全て処理すらされることなく消滅してしまいます。

特に値の発行タイミングや前後関係が重要なオペレータを使用する場合は、どのタイミングから処理が始まるのかを十分に意識して使わないといけません。同じストリーム定義であっても、Subscribeしたタイミングによっては挙動が変わってしまいます。

以下がその例です。

Coldの挙動
var subject = new Subject<string>();

//subjectから生成されたObservableは【Hot】
var sourceObservable = subject.AsObservable();

//ストリームに流れてきた文字列を連結して新しい文字列にするストリーム
//Scan()は【Cold】
var stringObservable = sourceObservable.Scan((p, c) => p + c);

//ストリームに値を流す
subject.OnNext("A");
subject.OnNext("B");

//ストリームに値を流した後にSubscribe
stringObservable.Subscribe(Console.WriteLine);

//Subscribe後にストリームに値を流す
subject.OnNext("C");

//完了
subject.OnCompleted();
実行結果
C

上のコードを実行した結果はCが出力されることになります。
これはScanオペレータがColdであるため、Subscribe前に発行されたABが処理されていないためです。

もしもここで「Subscribeするより前に発行された値も処理して欲しい」という場合はどうしたらよいでしょうか。その場合はHot変換オペレータを挟み、Subscribeするより前にストリームを起動しておけば良いわけです。

Hot変換を挟んだ場合の挙動
var subject = new Subject<string>();

//subjectから生成されたObservableは【Hot】
var sourceObservable = subject.AsObservable();

//ストリームに流れてきた文字列を連結するストリーム
//Scan()は【Cold】
var stringObservable = sourceObservable
    .Scan((p, c) => p + c)
    .Publish(); //Hot変換オペレータ

stringObservable.Connect(); //ストリーム稼働開始

//ストリームに値を流す
subject.OnNext("A");
subject.OnNext("B");

//ストリームに値を流した後にSubscribe
StringObservable.Subscribe(Console.WriteLine);

//Subscribe後にストリームに値を流す
subject.OnNext("C");

//完了
subject.OnCompleted();
実行結果
ABC

PublishというHot変換オペレータを間に挟むことで、Subscribeするより前にストリームを強制起動させることができます。


それぞれのObserverに対して別々の処理をする(ストリームの分岐点にならない

Cold Observableはストリームを分岐させる性質をもっていません。
そのため、Cold Observableを複数Subscribeした場合、それぞれに別々のストリームが生成されて割り振られることになります。


ただしストリームにHot Observableが存在した場合、最も末端に近いHot Observableでストリームが分岐して、さらに別々のストリームが生成されます。

まとめ

ストリームがどこで分岐するか?を常に意識して設計することが大事です。
「クラス外にObservableを公開する時はColdなまま公開せず、必ず末端でHotに変換してから公開する」など、意図しない所でストリームが分岐してしまわない様にしっかり制御してあげる必要があります。

なお、Cold→Hot変換には専用のオペレータが用意されているので、そちらを利用すると良いでしょう。
(こちらのページ下部Publish,PublishLast,Multicast等)

続き:【Reactive Extensions】 Hot変換はどういう時に必要なのか?