MediapipeのThreadPoolの実装を読んだ


Mediapipeは、Googleが開発しているオープンソースのコンピュータービジョンパイプラインフレームワークで、有向グラフを記述することで、トラッキングなどの処理を作成できます。

また、いくつかの処理向けのグラフがすでに用意されており、それらを使うだけでも、そこそこ多くの処理を実行できます。

そのグラフは、並列実行されていて、処理の入力に近い順から、スレッドプールにタスクが放り込まれて、ある程度並列に処理されているようです。

まずは、その処理のThreadPool部分の実装を読んでみました。

MediapipeのThreadPoolは下記のように使います。
非常にシンプルなAPIとなっています。実装も200行ちょっとで非常に軽いです。
onnxruntimeのThreadPoolは機能も豊富で、非常に長かったです。
thread実装はLinux系の実装と、それ以外用の実装があるようでLinux用の処理飲み読みます。

  ThreadPool pool("testpool", num_workers);
  pool.StartWorkers();
  for (int i = 0; i < N; ++i) {
    pool.Schedule([i]() { DoWork(i); });
  }

APIで使われているメンバ変数は次の通りです。
tasksは、タスクのキューとして、FIFOとして使っています。

CondVarは条件変数で次のドキュメントの通りの動作をします。

  absl::Mutex mutex_;
  absl::CondVar condition_;
  bool stopped_ ABSL_GUARDED_BY(mutex_) = false;
  std::deque<std::function<void()>> tasks_ ABSL_GUARDED_BY(mutex_);

StartWorkersで、ワーカースレッドを生成します。このワーカースレッドを生成すると、スレッドプールのキューを待つループを実行するスレッドが生成されます。内部的にはpthreadでスレッドの生成や設定が行われているようです。std::threadが使われていないのはなぜなのか不明です。Linux系以外のための実装もあって、そちらでは、std::threadが使われているようです。

スレッドのキューを待つループは、RunWorkerメソッドで、下記のようにmutexをロックして、ループして、関数を手に入れたら、ロックを解除して、タスクを実行します。タスクがない間は、すべてのスレッドはcondition_.Waitでタスクが追加されるのを待ちます。 Scheduleメソッドにより、タスクがPushされると、condition._Signalによって、Waitしているスレッドのうち一つのスレッドのブロックを解きます。そうすると、そのスレッドは、タスクを実行できるので、タスクキューから、タスクを取り出してmutexをUnlockしてタスクを実行します。

void ThreadPool::StartWorkers() {
  for (int i = 0; i < num_threads_; ++i) {
    threads_.push_back(new WorkerThread(this, name_prefix_));
  }
}

void ThreadPool::Schedule(std::function<void()> callback) {
  mutex_.Lock();
  tasks_.push_back(std::move(callback));
  condition_.Signal();
  mutex_.Unlock();
}
...

void ThreadPool::RunWorker() {
  mutex_.Lock();
  while (true) {
    if (!tasks_.empty()) {
      std::function<void()> task = std::move(tasks_.front());
      tasks_.pop_front();
      mutex_.Unlock();
      task();
      mutex_.Lock();
    } else {
      if (stopped_) {
        break;
      } else {
        condition_.Wait(&mutex_);
      }
    }
  }
  mutex_.Unlock();
}

RunWorkerに関して、下記のようにtasksを毎回チェックしてもいいんじゃないかと思ったのですが、
そうすると、次の2点のデメリットがあると感じました。

  • mutex.Lockがスレッドプール内のスレッドで毎サイクル実行されるので、Scheduleメソッドの実行にすこし遅延が生じるのではないか。
  • mutex.Lockの獲得とタスクキューのチェックがいずれかのスレッドで発生するため、すべてのスレッドをcondition.Waitで待たせるよりもCPU占有率が若干あがるのではないか
void ThreadPool::RunWorker() { 
  while (true) {
    mutex_.Lock();
    if (!tasks_.empty()) {
      std::function<void()> task = std::move(tasks_.front());
      tasks_.pop_front();
      mutex_.Unlock();
      
      task();
      
    } else {
      mutex_.Unlock();
      if (stopped_) {
        break;
      }
    }
  }
}