redux-sagaで排他制御をするサンプル


はじめに

redux-saga で排他制御がやりたくて,
await-semaphoreを使ったらうまくいきました.

サンプルは こちら にアップロードしています.
何かの参考になれば幸いです.

やりたいこと

やりたいことは, 複数のsagaがstore上のデータを同時並行に取り合う処理です.
より具体的には, 次のようなことがやりたいです.

  • storeにデータのリストがある ['a', 'b', 'c']
  • 2つのsagaが同時並行で以下をやる
    • storeからリストを取得する
    • リストから最初のデータを選択する
    • 取り出したデータをstoreから削除する

sagaが同じデータを取り出してしまったらNGです.
2個のsagaが順に実行されればOKなのですが,
sagaは並行に実行されるので, NGになるかもしれません.

排他制御が必要になるケース

例えば, 以下のケースでNGになります.

  • saga1: リストを取得 ['a', 'b', 'c']
  • saga2: リストを取得 ['a', 'b', 'c']
  • saga1: 'a'を選択
  • saga1: リストから'a'を削除 ['b', 'c']
  • saga2: 'a'を選択 (saga1によるリストの更新に気が付かず'a'を選択してしまう!)
  • saga2: リストから'a'を削除 ['b', 'c'] ('a'はすでにsaga1により削除されている!)

saga1, saga2どちらも'a'を取得してしまいました...

セマフォを使う

NGの原因は, saga1, saga2 が同時並行で実行されることです.
saga1が実行を終えるまで, saga2を待たせることができれば, うまくいきそうです.
本記事では, await-semaphore の セマフォを使ってこの排他制御を実現します.

排他制御なしでやる

コード

reducer.js
const initialState = {
  // このデータを複数のsagaが取り合う
  items: ["a", "b", "c"],
}

export default function reducer(state = initialState, action) {
  switch (action.type) {
    // 指定されたデータをstoreから削除
    case "REMOVE": {
      const { item } = action
      return {
        items: state.items.filter(x => x !== item),
      }
    }
    default:
      return state
  }
}
saga.js
import { fork, select, put, join } from "redux-saga/effects"

function* popItem() {
  // リストを取得
  const items = yield select(state => state.items)
  if (items.length <= 0) {
    throw new Error("insufficient items")
  }

  // リストから先頭のデータを選択
  const item = items[0]

  // 選択したデータをリストから削除
  yield put({ type: "REMOVE", item })

  // 呼び出し元に選択したデータを返す
  return item
}

export default function* mainSaga() {
  // 3個のsagaを起動
  const tasks = [yield fork(popItem), yield fork(popItem), yield fork(popItem)]

  // sagaの終了を待つ
  yield join(tasks)

  // 各sagaが取得したデータを表示
  console.info(
    "fetched items =",
    tasks.map(x => x.result())
  )
}

実行結果

実行すると, 3個のsagaすべてが 'a' を取得してしまいます...


% yarn start
yarn run v1.21.1
$ babel-node src/main.js
store changed = { items: [ 'b', 'c' ] }
store changed = { items: [ 'b', 'c' ] }
store changed = { items: [ 'b', 'c' ] }
fetched items = [ 'a', 'a', 'a' ] # すべてのsagaが 'a' を取得してしまった!
Done in 0.55s.

排他制御を加える

排他制御なしのコードにセマフォの記述を加えるだけです.

コード

sagaWithLock.js
import { call, fork, select, put, join } from "redux-saga/effects"
import { Semaphore } from "await-semaphore"

// セマフォ作成
// 引数が 1 ならば同時にロックを獲得できる saga は1つ
// 引数 1 を指定するなら, new Mutex() と等価
const sem = new Semaphore(1)

function* popItemWithLock() {
  // sem.acquire を呼び出して, ロックの獲得をする
  // もし, 他の saga がロックを獲得していれば, その saga が release() を呼び出すまで待つ
  // 他の saga がロックを獲得していなければ, すぐにロックを獲得できる
  //
  // call([sem, sem.acquire]) は sem.acquire() の呼び出しを意味する
  // https://redux-saga.js.org/docs/api/#callcontext-fn-args
  //
  // sem.acquire() は Promise を返すため, yield call でロックの獲得を待つ
  // https://www.npmjs.com/package/await-semaphore#semaphoreacquire-promise--void
  const release = yield call([sem, sem.acquire])
  try {
    // このブロックを実行できるsagaは高々1個
    const items = yield select(state => state.items)
    if (items.length <= 0) {
      throw new Error("insufficient items")
    }
    const item = items[0]
    yield put({ type: "REMOVE", item })
    return item
  } finally {
    // 忘れずにロックを解放する
    release()
  }
}

export default function* mainSagaWithLock() {
  const tasks = [
    yield fork(popItemWithLock),
    yield fork(popItemWithLock),
    yield fork(popItemWithLock),
  ]
  yield join(tasks)

  console.info(
    "fetched items =",
    tasks.map(x => x.result())
  )
}

実行結果

3個のsagaがそれぞれ'a', 'b', 'c'を取得できました!

% yarn start --enable-lock # オプションをつけると mainSagaWithLock が起動する
yarn run v1.21.1
$ babel-node src/main.js --enable-lock
store changed = { items: [ 'b', 'c' ] } # 'a' が取り出される
store changed = { items: [ 'c' ] }      # 'b' が取り出される
store changed = { items: [] }           # 'c' が取り出される
fetched items = [ 'a', 'b', 'c' ] # 相異なるデータを取得できた
Done in 0.53s.

さいごに

ありがとうございました.