redux-observable Epicがどう動くかを理解する


何についての記事か

ざっくりまとめるとredux-observableについて公式Docを読むだけではわからない下記の疑問を解決する記事

  • Epicに引数として渡されるaction\$,state$とはなんなのか どこから来るのか?
  • ActionがDispatchされてからどのようにEpicの処理が実行されるのか?
  • Epicが返すpiped ObservableのObserverはどこで定義され、どんなロジックが動いているのか?

前談 : Docを読んで使い方を理解する

RxJSベースでReduxの非同期処理を記述できるredux-observable
公式Docを読んでみると、以下のようなサンプルがあります。

import { ajax } from 'rxjs/ajax';

// action creators
const fetchUser = username => ({ type: FETCH_USER, payload: username });
const fetchUserFulfilled = payload => ({ type: FETCH_USER_FULFILLED, payload });

/// epic
const fetchUserEpic = action$ => action$.pipe(
  ofType(FETCH_USER),
  mergeMap(action =>
    ajax.getJSON(`https://api.github.com/users/${action.payload}`).pipe(
      map(response => fetchUserFulfilled(response))
    )
  )
);

上記のexampleは要約すると以下

  1. FETCH_USERアクションがdispatchされた時に処理を行う
  2. ajaxでapiにgetリクエストを投げる
  3. apiからの返却値をpayloadとしてFETCH_USER_FULFILLEDアクションをdispatchする

RxJSのoperatorの使い方がわかっていれば真似して実装することは難しくありません。
ドキュメントを読むことでredux-observableの基本的な使い方は理解できるし、実装に入ることができます。

ドキュメントを読んで理解できたこと

  • redux-observableで非同期処理を行う = Epicを作成する
  • action\$.pipeにoperatorを組み合わせて値の加工、APIへのリクエストを行う
  • epic(action\$, state\$).subscribe(store.dispatch)となるらしいので、epicはActionを値として出力するObservableが返せばいい

ドキュメントの文量は多くなく、exampleを見ればapi fetchとその結果を含んだAcitionをdispatchする書き方はすぐにわかります。
とりあえず真似して実装を進めることはできるでしょう。

ドキュメントを読んでもわからなかったこと

  • Epicに引数として渡されるaction\$,state$とはなんなのか どこから来るのか?
  • ActionがDispatchされてからどのようにEpicの処理が実行されるのか?
  • Epicが返すpiped ObservableのObserverはどこで定義され、どんなロジックが動いているのか?

これらは知る必要がないのだから、ドキュメントに書かれないのは当然。
とはいえ、せっかくObservableに触れるのだからReactive Programmingであることを意識しながら実装を進めた方が学びが多いでしょう。
Observableの流れを理解した上で非同期処理が実装できるようになることを目標に掘り下げていきます。

Epicについて

掘り下げるにあたって、Epicについてもう少しだけ確認しておきましょう。
Epicは以下のような形式の関数。redux-observableを使用して非同期処理を書くということはEpicを作成するということになります。

/// epic
const fetchUserEpic = (action$, state$, dependancies) => action$.pipe(
  ofType(TARGET_ACTION_TYPE),
  // some operators
);

第一引数のactions\$はObservable。何らかのActionがdispatchされると、そのActionが値としてストリームを流れます。
actions\$のpipeメソッドに処理を登録していくことで、トリガーとなるAction(この例ではTARGET_ACTION_TYPE)がdispatchされたあとのロジックを作成することができます。

第二引数のstate\$もobservable。こちらはredux-observableによって拡張されたクラスで、valueというプロパティを持っています。
処理をに使うというよりはstoreを参照するために利用されるもので、Epicの中でstateの値を参照したいときは\$state.valueを参照すればいいわけです。

第三引数のdependanciesはredux-observableの全体の動作を把握する上では一旦重要でないのでこちらのドキュメントを参照してください
【公式】Injecting Dependencies Into Epics

ドキュメントのexampleだけ見てもaction\$, state$が何処から来て何処へいくものなのかがよくわからず戸惑いがちだと思います。
Observarも登場しないので不慣れな人にとってはObservableパターンであることもわかりにくいかもしれません。
その辺りについて理解するために下記でEpicMiddlewareについて解説しています。

ちなみに、公式ドキュメント曰くaction\$についている\$は便宜上ついている識別子であって特に深い意味はないらしいのでそういう名前だと割り切りましょう。

EpicMiddlewareについて

redux-observableを利用する場合はepicMiddlewareを使用することになります。
下記のように適用します。

import { createEpicMiddleware } from 'redux-observable';
import { reducer } from '../path/to/reducer';
import { rootEpic } form '../path/to/rootEpic';

const epicMiddleware = createEpicMiddleware();
const store = createStore(reducer, applyMiddleware(epicMiddleware));

epicMiddleware.run(rootEpic);

EpicMiddlewareの動作やEpicとの関係を図で表してみました

私たちが作成するEpicはEpicMiddlewareの処理をきっかけに動作します。
Epicの引数となるaction\$, state\$はcreateEpicMiddleware()実行時に作成されます。
つまり、createEpicMiddleware.tsを読み解くことができれば、redux-observableの全体動作を掴むことができます。

createEpicMiddleware.tsを読み解く

ActionをDispatchされた時の動作

EpicMiddlewareのコードの内、Actionがdispatchされたときに処理を開始するトリガーの部分から見ていきます。非常にシンプルです。
上図の中央のEpicMiddlewareの部分です。

    // createEpicMiddleware.ts  Line 86
    return next => {
      return action => {
        const result = next(action);

        stateSubject$.next(store.getState());
        actionSubject$.next(action);

        return result;
      };
    };
処理1. next(action)

なんらかActionがDispatchされると、まずは後続のmiddlewareの処理とreducerの処理を先に完了させます。
Epicの処理が走るのはreducerによってstoreが更新された後になります。

処理2. stateSubject\$.next(store.getState())

storeからstateを取得し、stateSubject\$.next()を実行してサブジェクトにstateを値として注入します。
stateSubject\$のObserverによってstate\$.valueが注入された値に置き換わります。
つまり、なんらかのActionがdispatchされた度にstate\$.valueは最新化されます。

処理3. actionSubject$.next(action)

actionをactionSubject\$に注入しています。
あとで詳しく書きますが、action\$はactionSubject\$をソースとしたObservableなのでactionSubject\$に値が注入されるとaction\$のストリーム処理が実行されることになります。
つまり、この行がEpicを動作させるためのトリガーになっています。

stateSubject$とactionSubject$の生成

なんらかのActionがdispatchされた時にstateSubject\$とactionSubject\$に値が注入されることはわかりました。
この2つのSubjectがどのように生成され、どう使われているかを見ていきます。

    // createEpicMiddleware.ts  Line 53
    const actionSubject$ = new Subject<T>();
    const stateSubject$ = new Subject<S>();
    const action$ = actionSubject$
      .asObservable()
      .pipe(observeOn(uniqueQueueScheduler));
    const state$ = new StateObservable(
      stateSubject$.pipe(observeOn(uniqueQueueScheduler)),
      store.getState()
    );

actionSubject\$, stateSubject\$はRxJSの純粋なSubjectクラスのインスタンスです。

state\$はStateObservableクラスのインスタンスです。このクラスはObservableを継承したものでvalueプロパティが追加されています。
constractor第一引数のstateSubject\$を値の入り口としてvalueを更新します。 例: stateSubject\$.next(value)
StateObservable自身はObserverを持たず、valueプロパティの更新以外は行いません。

action\$はactionSubject\$をObservableに変換したインスタンスです。

ここで作成されたaction\$とstate\$がEpicの引数として渡されます。
次は、Epicが呼び出される部分のコードを読んでみましょう。

Epicの呼び出し

上の概略図でいうと左下あたりのEpicとそのObservableとObserverのあたりです。

const epic$ = new Subject<Epic<T, O, S, D>>();
// ...

// createEpicMiddleware.ts  Line 63
const result$ = epic$.pipe(
      map(epic => {
        const output$ = epic(action$, state$, options.dependencies!);

        if (!output$) {
          throw new TypeError(
            `Your root Epic "${epic.name ||
              '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`
          );
        }

        return output$;
      }),
      mergeMap(output$ =>
        from(output$).pipe(
          subscribeOn(uniqueQueueScheduler),
          observeOn(uniqueQueueScheduler)
        )
      )
    );

    result$.subscribe(store.dispatch);

epic\$はepicが注入されることを期待しているSubjectです。epicMiddleware.run(rootEpic)を動かした時にrootEpicを受け取る部分です。
epic\$.pipe()は2つのオペレーターを含んでいます。
1つ目のmapはシンプルにepicを実行して返り値を返しています。返り値が不正の場合はエラーを出力します。
2つ目のmergeMapでhigh order Observableとしてepicの返却値であるoutput$がmergeされています。

以上からresult\$はepicの返り値であるoutput\$にSchedulerが追加されたObservableと言えます。
最後にresult\$のObservableとしてstore.dispatchを登録しています。
私たちがoperatorを追記して作成したEpicが出力する値(Action)がそのままstore.dispatchに渡されることがわかります。

色々とwrapされて名前が変わっているのがわかりづらいですが、result\$のソースはactionSubject\$です。
「ActionをDispatchされたときの動作」のところで説明したactionSubject\$.next(action)がトリガーとなり、私たちが作成したEpicが返すObservableが動いて別のActionが出力され、Observerによってdispatchされていることがわかります。

epicMiddleware.run()

// createEpicMiddleware.ts  Line 103
  epicMiddleware.run = rootEpic => {
    if (process.env.NODE_ENV !== 'production' && !store) {
      warn(
        'epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.'
      );
    }
    epic$.next(rootEpic);
  };

最後にstoreにmiddlewareを登録した後に実行するepicMiddleware.run()について
これは先ほどepicをwrapしていたepic\$にepicを渡す役割を持っています。

最初の疑問への回答

冒頭、公式ドキュメントを読んだけどわからなかった疑問を挙げました。
EpicMiddlewareについて理解した今なら回答できるはずなので、答えを記して終わろうと思います。
 

Epicに渡されるaction$,state$とはなんなのか どこから来るのか?

→ createEpicMiddleware実行時に生成されるSubjectをソースとしたObservable
 

ActionがDispatchされてからどのようにEpicの処理が実行されるのか?

→ なんらかのActionがdispatchされるとEpicMiddlewareが$actionSubjectにActionを渡す それをトリガーにEpicの処理が動作する
 

Epicが返すpiped ObservableのObserverは何処で定義され、何をしているのか?

→ createEpicMiddleware実行時にEpicの返り値に参照を持つObservableの.subscribe()メソッドが起動され、store.dispatchがObserverのnextとして登録される。
 

以上。誤り、改善ポイントなどあればコメントいただければ幸いです。