RxJS の Observable / Observer の概要


この記事は bouzuya's RxJS Advent Calendar 2015 の 2 日目かつ RxJS Advent Calendar 2015 の 2 日目です。

はじめに

今日の内容は公式リポジトリにあるドキュメント Exploring The Major Concepts in RxJS とほとんど同じものです。

また RxJS 4.0.7 を対象にしています。

Obervable と Observer

RxJS における最も基本的なクラス Observable / Observer の概要を確認します。

Observable はデータソースを表すクラスで、名前どおり observe できる何かです。Observable からデータを受け取るには subscribe(observer) メソッドを呼び出します (メソッド名は observe ではありません) 。subscribe メソッドは引数に Observer のインスタンスを取ります。

ObserverObservable からのデータを受け取るためのクラスです。名前どおり流れてくるデータを observe するものです。onNext / onError / onCompletedObservable により呼び出され (値が Push され) ます。それぞれ Stream の値・エラー・完了を通知します。

class Observable
  subscribe: (observer) -> # ... 

class Observer
  onNext: (value) -> # ...
  onError: (error) -> # ...
  onCompleted: -> # ...

Disposable

もうひとつクラスを紹介します。Disposable クラスです。

Observable.prototype.subscribeDisposable のインスタンスを返します。

Disposable クラスは dispose メソッドを持ちます。 connection の close などの後処理をできます。

class Disposable
  dispose: ->

class Observable
  subscribe: (observer) ->
    # ... (returns Disposable)

実用上の注意

実際に RxJS を使う場合には DisposableObserver を見ることはほとんどありません。

後処理が要らないケースも多いですし、subscribe Observer のコンストラクターと同様の引数を取り、内部で Observer を生成するので直接操作する必要はないからです。

{ Observable } = require 'rx'

Observable
  .range(1, 3)
  .subscribe (value) -> # onNext
    console.log value

また ObservableObserverDispose を継承したクラスをつくることも、まずありません。独自の動きをする Observable については、そのためのメソッドが用意されています。これはまた別の機会に書きます。

ソースコードを眺める

最後にすこしだけソースコードを眺めてみましょう。

RxJS のモジュールは ES6 や Node.js のような仕組みではなく独自の concat を前提とした形になっています。ES6 class も使われていませんし、オレオレな util もあるので慣れるまでは読みづらいかもしれません。とりあえず今回は軽く眺めましょう。

Observable のソースコード

まずは Observable を眺めましょう。

この記事で紹介した subscribe や利便性のためなのか subscribeOnNextsubscribeOnErrorsubscribeOnCompleted などが並んでいます。

mapfilter などの Operator については別ファイルで Observable.prototype (変数名 observableProto) に追加されます。このファイル分割は .NET Framework における拡張メソッドを定義しているイメージなのかもしれませんね。

次のコードは Observable.prototype.subscribe です。

observable.js
    observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
      return this._subscribe(typeof oOrOnNext === 'object' ?
        oOrOnNext :
        observerCreate(oOrOnNext, onError, onCompleted));
    };

Observable.prototype.subscribethis._subscribe を呼び出すようになっています。そして this._subscribeObservable を継承した各 operator やその共通実装である ObservableBase が実装しています。詳細は別の機会にしましょう。

Observer のソースコード

次に Observer を眺めましょう。

残念ながら onNext などは見当たりません。代わりに気になるコードがあります。次のコードは Observer.create です。これは Observer をつくるメソッドでしょう。

observer.js
  var observerCreate = Observer.create = function (onNext, onError, onCompleted) {
    onNext || (onNext = noop);
    onError || (onError = defaultError);
    onCompleted || (onCompleted = noop);
    return new AnonymousObserver(onNext, onError, onCompleted);
  };

ここで代入されている変数 observerCreate は上記の Observable.prototype.subscribe で使われています。 Observable.prototype.subscribeObserver 以外を渡して呼び出したときに Observer の実装として AnonymousObserver が使われるようです。

ちなみに、上記の理由から AnonymousObserver はエラー時のスタックトレースでよく見かけます (RxJS あるある) 。

では、次は AnonymousObserver を眺めましょう。

ここには next / error / completed はありますが onNext などはありません。AnonymousObserver.prototype.next が呼び出されたとき、コンストラクターで受け取った onNext を呼び出します。

anonymousobserver.js
var AnonymousObserver = Rx.AnonymousObserver = (function (__super__) {
  // ...
}(AbstractObserver));

AnonymousObserverAbstractObserver を継承 (?) しているようです。

次は、AbstractObserver を眺めましょう。

ここには onNext などの期待したメソッドが提供されています。次のコードは onNext です。onNext が呼び出されたときに this.next を呼び出しています。

abstractobserver.js
    AbstractObserver.prototype.onNext = function (value) {
      !this.isStopped && this.next(value);
    };

さきほどの AnonymousObserver.prototype.next はこの実装です。onNext はテンプレートメソッドパターンで、自身を継承したサブクラスに実装 (this.next) を任せます。これは Observable.prototype.subscribethis._subscribe とも同じ構造ですね。

また AbstractObserverObserver を継承 (?) しているみたいですね。型の階層としては Observer > AbstractObserver > AnonymousObserver のようです。おそらくインターフェース > 抽象クラス > 実装クラスといったところでしょう。抽象クラスの AbstractObserveronNext を持っているのは効率など実装上の都合でしょう。

おわりに

今日は Observable / Observer のインタフェースや、その実装を軽く眺めました。