MediapipeのThreadPoolの実装を読んだ
Mediapipeは、Googleが開発しているオープンソースのコンピュータービジョンパイプラインフレームワークで、有向グラフを記述することで、トラッキングなどの処理を作成できます。
また、いくつかの処理向けのグラフがすでに用意されており、それらを使うだけでも、そこそこ多くの処理を実行できます。
そのグラフは、並列実行されていて、処理の入力に近い順から、スレッドプールにタスクが放り込まれて、ある程度並列に処理されているようです。
まずは、その処理のThreadPool部分の実装を読んでみました。
MediapipeのThreadPoolは下記のように使います。
非常にシンプルなAPIとなっています。実装も200行ちょっとで非常に軽いです。
onnxruntimeのThreadPoolは機能も豊富で、非常に長かったです。
thread実装はLinux系の実装と、それ以外用の実装があるようでLinux用の処理飲み読みます。
ThreadPool pool("testpool", num_workers);
pool.StartWorkers();
for (int i = 0; i < N; ++i) {
pool.Schedule([i]() { DoWork(i); });
}
APIで使われているメンバ変数は次の通りです。
tasksは、タスクのキューとして、FIFOとして使っています。
CondVarは条件変数で次のドキュメントの通りの動作をします。
absl::Mutex mutex_;
absl::CondVar condition_;
bool stopped_ ABSL_GUARDED_BY(mutex_) = false;
std::deque<std::function<void()>> tasks_ ABSL_GUARDED_BY(mutex_);
StartWorkersで、ワーカースレッドを生成します。このワーカースレッドを生成すると、スレッドプールのキューを待つループを実行するスレッドが生成されます。内部的にはpthreadでスレッドの生成や設定が行われているようです。std::thread
が使われていないのはなぜなのか不明です。Linux系以外のための実装もあって、そちらでは、std::thread
が使われているようです。
スレッドのキューを待つループは、RunWorker
メソッドで、下記のようにmutexをロックして、ループして、関数を手に入れたら、ロックを解除して、タスクを実行します。タスクがない間は、すべてのスレッドはcondition_.Waitでタスクが追加されるのを待ちます。 Scheduleメソッドにより、タスクがPushされると、condition._Signalによって、Waitしているスレッドのうち一つのスレッドのブロックを解きます。そうすると、そのスレッドは、タスクを実行できるので、タスクキューから、タスクを取り出してmutexをUnlockしてタスクを実行します。
void ThreadPool::StartWorkers() {
for (int i = 0; i < num_threads_; ++i) {
threads_.push_back(new WorkerThread(this, name_prefix_));
}
}
void ThreadPool::Schedule(std::function<void()> callback) {
mutex_.Lock();
tasks_.push_back(std::move(callback));
condition_.Signal();
mutex_.Unlock();
}
...
void ThreadPool::RunWorker() {
mutex_.Lock();
while (true) {
if (!tasks_.empty()) {
std::function<void()> task = std::move(tasks_.front());
tasks_.pop_front();
mutex_.Unlock();
task();
mutex_.Lock();
} else {
if (stopped_) {
break;
} else {
condition_.Wait(&mutex_);
}
}
}
mutex_.Unlock();
}
RunWorkerに関して、下記のようにtasksを毎回チェックしてもいいんじゃないかと思ったのですが、
そうすると、次の2点のデメリットがあると感じました。
- mutex.Lockがスレッドプール内のスレッドで毎サイクル実行されるので、Scheduleメソッドの実行にすこし遅延が生じるのではないか。
- mutex.Lockの獲得とタスクキューのチェックがいずれかのスレッドで発生するため、すべてのスレッドをcondition.Waitで待たせるよりもCPU占有率が若干あがるのではないか
void ThreadPool::RunWorker() {
while (true) {
mutex_.Lock();
if (!tasks_.empty()) {
std::function<void()> task = std::move(tasks_.front());
tasks_.pop_front();
mutex_.Unlock();
task();
} else {
mutex_.Unlock();
if (stopped_) {
break;
}
}
}
}
Author And Source
この問題について(MediapipeのThreadPoolの実装を読んだ), 我々は、より多くの情報をここで見つけました https://zenn.dev/xiongjie/articles/5bb43ee93a2dc4著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Collection and Share based on the CC protocol