ジェネレータを使った非同期処理


はじめに

ES6以降、JavaScriptの非同期処理は、GeneratorやPromiseを使って書きやすくはなりました。しかし、直列と並列を変更したり、混在させたり、何層にも入れ子にしたい場合があるので、より書きやすく読みやすい記述を検討します。

サンプル用非同期処理関数(sleep)の準備

関数内の実行状況がわかるサンプルを用意してテストします。実際のコードで使用する実用的なものとしては、画像の読み込み、AjaxでのAPI呼び出し、タイマーで起動させるものなどがあります。

テスト用サンプル関数の準備
const sleep = (time, callback) => {
  console.log('sleep time : ' + time + ' start');
  const completion = () => {
    console.log('sleep time : ' + time + ' callback');
    callback();
  };
  if (time === 0) { // 関数終了前にコールバックが呼ばれる場合のテスト用
    completion();
  } else {
    setTimeout(completion, time);
  }
};

原始的な非同期処理の例

サンプル
// 直列非同期処理
sleep(300, () => {
  sleep(200, () => {
    sleep(100, () => {
      console.log('completed.');
    });
  });
});

// 並列非同期処理
let n = 3;
const check = () => { if (--n === 0) console.log('completed.'); };
sleep(300, check);
sleep(200, check);
sleep(100, check);
テスト結果
// 直列非同期処理
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 200 callback
sleep time : 100 start
sleep time : 100 callback
completed.

// 並列非同期処理
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback
completed.

ジェネレータを使った非同期処理

前述の非同期処理は直列と並列で構造が異なるため、ジェネレータを使って、直列と並列の非同期処理を同じ構造になるように書きます。直列か並列かは、serial/parallelで切り替えます。

(準備)現在実行している処理が完了してから、関数を実行する仕組み(postpone)の用意

基本的な動作としては、setTimeout(callback, 0);等と同じものです。
ここではジェネレータを使用する時に、再帰的呼び出し状態を回避するために使います。

postpone
const postpone = (callback) => {
  // setTimeout(callback, 0); は4msの制限があるため
  // gif 1x1
  const img = new Image();
  img.onload = callback;
  img.onerror = callback;
  img.src = 'data:image/gif;base64,R0lGODlhAQABAIAAAAUEBAAAACwAAAAAAQABAAACAkQBADs=';
};

非同期処理ジェネレータ serial/parallel

serial/parallel
// 直列用
const serial = function (generator, completion) {
  function proceeder() {
    postpone(() => { g.next().done && completion && completion(); });
  }
  var g = generator(proceeder);
  proceeder();
};

// 並列用
const parallel = function (generator, completion) {
  let n = 0;
  function proceeder() {
    postpone(() => {
      if (n-- !== 0) return;
      completion && completion();
    });
  }
  var g = generator(proceeder);
  postpone(() => {
    while (!g.next().done) n++;
    proceeder();
  });
};

このserial/parallelを使って、前述の原始的非同期処理を書いたサンプルです。

サンプル
// 直列
serial(function* (cb) {
  yield sleep(300, cb);
  yield sleep(200, cb);
  yield sleep(100, cb);
}, () => {
  console.log('serial : end');
});

// 並列
parallel(function* (cb) {
  yield sleep(300, cb);
  yield sleep(200, cb);
  yield sleep(100, cb);
}, () => {
  console.log('parallel : end');
});
テスト結果
// 直列
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 200 callback
sleep time : 100 start
sleep time : 100 callback
serial : end

// 並列
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback
parallel : end

serial/parallel関数の引数completionは必要がなければ記述を省略できます。

サンプル
serial(function* (cb) {
  yield sleep(200, cb);
  yield sleep(100, cb);
});

parallel(function* (cb) {
  yield sleep(200, cb);
  yield sleep(100, cb);
});

serial/parallelは混在させることができます。入れ子にする時は、completionで上位階層のコールバックを呼びます。

サンプル
serial(function* (cb) {
  yield sleep(700, cb);
  yield parallel(function* (cb) {
    yield sleep(600, cb);
    yield serial(function* (cb) {
      yield sleep(500, cb);
      yield sleep(400, cb);
    }, cb);
    yield parallel(function* (cb) {
      yield sleep(300, cb);
      yield sleep(200, cb);
    }, cb);
  }, cb);
  yield sleep(100, cb);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

ジェネレータを使った非同期処理の改良その1

前述のserial/parallelを入れ子にする場合、completionで上位階層のコールバックを呼びますが、

  • どの階層のジェネレータ関数の引数で与えられているコールバックが呼ばれているのかわかりにくい。
  • completion部分の記述を忘れた場合に正しく動かない。
  • そもそもcompletion部分でコールバックを指定するのが面倒である。

という問題があります。
そのため、completion部分でコールバックを呼ばなくて済むように改良します。具体的には、serial/parallelが外からコールバックを指定できる関数を返すことで、それを経由してコールバック関数を受け取り、自動で呼ばれるようにします。

非同期処理ジェネレータ serial/parallel 改良版その1

serial/parallel
const serial = (generator, completion) => {
  let callback = () => { callback = void 0; };
  const set_callback = cb => {
    callback || cb();
    callback = cb;
  };
  const proceed = () => {
    postpone(() => {
      const r = g.next();
      if (typeof r.value === 'function' && r.value.name === 'set_callback') r.value(proceed);
      if (r.done) {
        completion && completion();
        callback();
      }
    });
  };
  const g = generator(proceed);
  proceed();
  return set_callback;
};

const parallel = (generator, completion) => {
  let n = 0;
  let callback = () => { callback = void 0; };
  const set_callback = cb => {
    callback || cb();
    callback = cb;
  };
  const proceed = () => {
    postpone(() => {
      if (n-- !== 0) return;
      completion && completion();
      callback();
    });
  };
  const g = generator(proceed);
  postpone(() => {
    for (let r = g.next(); !r.done; r = g.next(), n++) {
      if (typeof r.value === 'function' && r.value.name === 'set_callback') r.value(proceed);
    }
    proceed();
  });
  return set_callback;
};
サンプル
serial(function* (cb) {
  yield sleep(700, cb);
  yield parallel(function* (cb) {
    yield sleep(600, cb);
    yield serial(function* (cb) {
      yield sleep(500, cb);
      yield sleep(400, cb);
    });
    yield parallel(function* (cb) {
      yield sleep(300, cb);
      yield sleep(200, cb);
    });
  });
  yield sleep(100, cb);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

ジェネレータを使った非同期処理の改良その2

さらに改良を加えて、ジェネレータ関数の引数及び、非同期処理関数のコールバック関数を省略できるようにします。コールバック関数を遅延設定、実行するために自前クラスTransmitterを用意しました。自前関数transmitはそれを使いやすくするヘルパ関数になります。

非同期処理ジェネレータ serial/parallel 最終形

serial/parallel
const Transmitter = class {
  constructor(executor) { // executor = callback => {...};
    this.value = void 0;  // 同期部分のreturnで返す値
    this.result = void 0; // 非同期部分のcallbackで返す値
    this._callback = result => {
      this.result = result;
      this._callback = void 0;
    };
    this.value = executor(result => { this._callback(result); });
  }
  connect(callback) {
    this._callback || callback(this.result);
    this._callback = callback;
    return this.value;
  }
};

// 最後の引数がコールバック関数となる関数のみ使用可能
// 仮のコールバック関数を強制設定する
const transmit = (func, ...args) => {
  return new Transmitter(callback => {
    if (func.length === 0) postpone(callback);
    if (func.length > 0) {
      const org = args[func.length - 1];
      if (typeof org === 'function') {
        args[func.length - 1] = () => { org(); callback(); };
      } else if (org === void 0 || org === null) {
        args[func.length - 1] = callback;
      } else {
        postpone(callback);
      }
    }
    return func(...args);
  });
};

const serial = (generator, completion) => {
  return new Transmitter(callback => {
    let y = void 0;
    const proceed = () => {
      postpone(() => {
        const r = g.next(y);
        if (!r.done) {
          y = (r.value instanceof Transmitter) ? r.value.connect(proceed) : r.value;
        } else {
          completion && completion();
          callback();
        }
      });
    };
    const g = generator(proceed);
    proceed();
  });
};

const parallel = (generator, completion) => {
  return new Transmitter(callback => {
    let n = 0;
    const proceed = () => {
      postpone(() => {
        if (n-- !== 0) return;
        completion && completion();
        callback();
      });
    };
    const g = generator(proceed);
    postpone(() => {
      let y = void 0;
      for (let r = g.next(y); !r.done; r = g.next(y), n++) {
        y = (r.value instanceof Transmitter) ? r.value.connect(proceed) : r.value;
      }
      proceed();
    });
  });
};
サンプル
serial(function* () {
  yield transmit(sleep, 700);
  yield parallel(function* () {
    yield transmit(sleep, 600);
    yield serial(function* () {
      yield transmit(sleep, 500);
      yield transmit(sleep, 400);
    });
    yield parallel(function* () {
      yield transmit(sleep, 300);
      yield transmit(sleep, 200);
    });
  });
  yield transmit(sleep, 100);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

対象となる非同期処理関数で、Transmitterを使用したり、transmitでラップしておけば、transmitの記述をする必要も無くなります。

サンプル
const t1Sleep = (time) => { return new Transmitter(callback => { sleep(time, callback); }); };
const t2Sleep = (...args) => { return transmit(sleep, ...args); };

serial(function* () {
  yield t1Sleep(400);
  yield t2Sleep(300);
  yield parallel(function* () {
    yield t1Sleep(200);
    yield t2Sleep(100);
  });
});
テスト結果
sleep time : 400 start
sleep time : 400 callback
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback

transmitを使用せずに明示的にコールバック呼ぶ形式(改良版その1の書き方)も混在できます。

サンプル
serial(function* (cb) {
  yield t1Sleep(600);
  yield t2Sleep(500);
  yield sleep(400, cb);
  yield parallel(function* (cb) {
    yield t1Sleep(300);
    yield t2Sleep(200);
    yield sleep(100, cb);
  });
});
テスト結果
sleep time : 600 start
sleep time : 600 callback
sleep time : 500 start
sleep time : 500 callback
sleep time : 400 start
sleep time : 400 callback
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback

以上です。