NODEJSのバッチでの非同期タスクを実行する


あなたが項目の大きなリストを持っていると言うと、それらのすべての非同期タスクを実行します.これらのタスクを一度に実行することができます並列またはバッチで.このブログ記事では、バッチでの非同期タスクの実行方法を説明します.

タスク
バッチでタスクを実行するには、タスクが必要です.それで、約束をつくって、それを解決するために0と5秒の間を待つタスクを作成しましょう.
function task() {
  return new Promise((resolve) => {
    setTimeout(resolve, Math.floor(Math.random() * 5000 + 1));
  });
}

並列に走る
ここで、アイテムのリストをフィードし、すべてのアイテムに対して作成したタスクを実行できる機能が必要です.
function executeTasksConcurrently(list) {
  for (const item of list) {
    task();
  }
}
この関数をフィードするときに、それはtask() 並列に各項目の関数.
どの順序のタスクが実行され、完了するかを示すにはconsole.log 文.次のコードとコンソール出力を確認します.
コード
function task(item) {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log(`End task: ${item}`);
      resolve();
    }, Math.floor(Math.random() * 5000 + 1));
  });
}

const list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

async function executeTasksConcurrently(list) {
  for (const item of list) {
    console.log(`Start task: ${item}`);
    task(item);
  }
}

executeTasksConcurrently(list);
コンソール出力:
$ yarn start
Start task: 1
Start task: 2
Start task: 3
Start task: 4
Start task: 5
Start task: 6
Start task: 7
Start task: 8
Start task: 9
Start task: 10
End task: 7
End task: 9
End task: 6
End task: 2
End task: 3
End task: 8
End task: 5
End task: 10
End task: 1
End task: 4
✨  Done in 5.12s.

バッチで走る
バッチでタスクを実行するには、まずアクティブなタスクを追跡する必要があります.このリストは、タスクの開始時とタスク終了時に更新する必要があります.
async function executeTasksConcurrently(list) {
  let activeTasks = [];

  for (const item of list) {
    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}
今、我々は同時に実行するタスクを決定する必要があります.この例では、3つのタスクを同時に実行できるようにします.次に、現在のアクティブなタスクの量がこの制限に一致するときに、新しいタスクを開始する前にタスクが終了するまで、forループを待機させる必要があります.以来activeTasks リストは、我々が使うことができる約束の配列です Promise.race どのタスクが最初に終了するかチェックする.
async function executeTasksConcurrently(
  list,
  concurrencyLimit = 3
) {
  let activeTasks = [];

  for (const item of list) {
    if (activeTasks.length >= concurrencyLimit) {
      await Promise.race(activeTasks);
    }

    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}
完全なコードとコンソール出力をチェックします.
コード
function task() {
  return new Promise((resolve) => {
    setTimeout(resolve, Math.floor(Math.random() * 5000 + 1));
  });
}

const list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

async function executeTasksConcurrently(
  list,
  concurrencyLimit: number = 3
) {
  let activeTasks = [];

  for (const item of list) {
    if (activeTasks.length >= concurrencyLimit) {
      await Promise.race(activeTasks);
    }

    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}

executeTasksConcurrently(list);
コンソール出力:
$ yarn start
Start task: 1
Start task: 2
Start task: 3
End task: 3
Start task: 4
End task: 2
Start task: 5
End task: 1
Start task: 6
End task: 4
Start task: 7
End task: 5
Start task: 8
End task: 8
Start task: 9
End task: 6
Start task: 10
End task: 10
End task: 7
End task: 9
✨  Done in 11.27s.

結論
バッチでタスクを実行すると、リソースの過負荷を防ぐのに役立つことができます実装するのはかなり簡単です.このコードを自分で維持したり書いたりする必要がない場合は、このパターンを実装したサードパーティライブラリを使用できます.例えばSupercharge's Promise Pool .
このスクリプトを実行したい場合は、GitHub .
あなたが質問やフィードバックを持っている場合コメントや無料で私に連絡してください!