C+-スレッドプール
4465 ワード
1.Boost.Asioスレッドプール
ダウンロード:https://sourceforge.net/proje...
VSで使用:プロジェクト-プロパティ-VCディレクトリ-ディレクトリを含む、YourPathasio-1.18.2includeを追加
公式ドキュメント:https://www.boost.org/doc/lib...
その他の操作:
可能であれば、直ちにスレッドを終了する、まだ実行されていないタスクは実行されない可能性がある.
2.カスタムスレッドプール実装
実現構想:は、ユーザが発行したタスクを保存するタスクキューを維持する. スレッドセットを作成し、各スレッドにタスクキューをポーリングさせ、タスクキューからタスクを抽出して実行する. ThreadPool.h
ThreadPool.cpp:
メインプログラム:
ダウンロード:https://sourceforge.net/proje...
VSで使用:プロジェクト-プロパティ-VCディレクトリ-ディレクトリを含む、YourPathasio-1.18.2includeを追加
公式ドキュメント:https://www.boost.org/doc/lib...
#include
#include
#include
using namespace std::literals;
static std::atomic_uint32_t count = 0;
int main()
{
//
asio::thread_pool pool(2);
auto work = []()
{
std::this_thread::sleep_for(1ns);
count++;
};
int n = 1000;
for (int i = 0; i < n; i++)
{
//
asio::post(pool, work);
}
//
pool.join();
std::cout << "count = " << count << '
';
}
count = 1000
その他の操作:
void stop();
可能であれば、直ちにスレッドを終了する、まだ実行されていないタスクは実行されない可能性がある.
2.カスタムスレッドプール実装
実現構想:
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include
#include
#include
#include
#include
#include
#include
using Task = std::function;
class ThreadPool
{
public:
ThreadPool(int nThreads = 1);
~ThreadPool();
//
bool submitTask(const Task& task);
bool submitTask(Task&& task);
//
void join();
private:
void runTasks();
private:
std::vector<:thread> m_threads; //
std::list m_taskQueue; //
std::atomic m_exit; //
std::mutex m_taskQueueMutex;
std::condition_variable m_taskQueueNotEmpty;
};
#endif
ThreadPool.cpp:
ThreadPool::ThreadPool(int nThreads) : m_exit(false)
{
m_threads.reserve(nThreads);
for (int i = 0; i < nThreads; i++)
{
m_threads.emplace_back(std::move(std::thread(&ThreadPool::runTasks, this)));
}
}
ThreadPool::~ThreadPool()
{
join();
}
bool ThreadPool::submitTask(const Task& task)
{
std::lock_guard<:mutex> qLock(m_taskQueueMutex);
if (m_taskQueue.size() == m_taskQueue.max_size())
{
return false;
}
m_taskQueue.push_back(task);
m_taskQueueNotEmpty.notify_one();
return true;
}
bool ThreadPool::submitTask(Task&& task)
{
std::lock_guard<:mutex> qLock(m_taskQueueMutex);
if (m_taskQueue.size() == m_taskQueue.max_size())
{
return false;
}
m_taskQueue.emplace_back(std::move(task));
m_taskQueueNotEmpty.notify_one();
return true;
}
void ThreadPool::join()
{
m_exit.store(true);
m_taskQueueNotEmpty.notify_all();
for (auto&& t : m_threads)
{
t.join();
}
m_threads.clear();
}
void ThreadPool::runTasks()
{
Task task;
while (true)
{
std::unique_lock<:mutex> qLock(m_taskQueueMutex);
while (m_taskQueue.empty() && !m_exit)
{
m_taskQueueNotEmpty.wait(qLock);
}
if (m_taskQueue.empty())
{
return;
}
task = m_taskQueue.front();
m_taskQueue.pop_front();
qLock.unlock();
task();
}
}
メインプログラム:
#include
#include
#include "ThreadPool.h"
using namespace std::literals;
static std::atomic_uint32_t count = 0;
int main()
{
ThreadPool pool(2);
int n = 1000;
for (int i = 0; i < n; i++)
{
pool.submitTask([]()
{
std::this_thread::sleep_for(1ns);
count++;
});
}
pool.join();
std::cout << "count = " << count << '
';
}
count = 1000