C+-スレッドプール


1.Boost.Asioスレッドプール
ダウンロード:https://sourceforge.net/proje...
VSで使用:プロジェクト-プロパティ-VCディレクトリ-ディレクトリを含む、YourPathasio-1.18.2includeを追加
公式ドキュメント:https://www.boost.org/doc/lib...
#include 
#include 
#include 

using namespace std::literals;

static std::atomic_uint32_t count = 0;

int main()
{
    //     
    asio::thread_pool pool(2);

    auto work = []()
    {
        std::this_thread::sleep_for(1ns);
        count++;
    };

    int n = 1000;
    for (int i = 0; i < n; i++)
    {
        //     
        asio::post(pool, work);
    }
    
    //           
    pool.join();

    std::cout << "count = " << count << '
'; }
count = 1000

その他の操作:
void stop();

可能であれば、直ちにスレッドを終了する、まだ実行されていないタスクは実行されない可能性がある.
2.カスタムスレッドプール実装
実現構想:
  • は、ユーザが発行したタスクを保存するタスクキューを維持する.
  • スレッドセットを作成し、各スレッドにタスクキューをポーリングさせ、タスクキューからタスクを抽出して実行する.
  • ThreadPool.h
    #ifndef THREADPOOL_H
    #define THREADPOOL_H
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using Task = std::function;
    
    class ThreadPool
    {
    public:
        ThreadPool(int nThreads = 1);
        ~ThreadPool();
    
        //     
        bool submitTask(const Task& task);
        bool submitTask(Task&& task);
    
        //         
        void join();
    
    private:
        void runTasks();
    
    private:
        std::vector<:thread> m_threads;             //        
        std::list m_taskQueue;                    //     
        std::atomic m_exit;                       //      
        std::mutex m_taskQueueMutex;
        std::condition_variable m_taskQueueNotEmpty;
    };
    
    #endif

    ThreadPool.cpp:
    ThreadPool::ThreadPool(int nThreads) : m_exit(false)
    {
        m_threads.reserve(nThreads);
    
        for (int i = 0; i < nThreads; i++)
        {
            m_threads.emplace_back(std::move(std::thread(&ThreadPool::runTasks, this)));
        }
    }
    
    ThreadPool::~ThreadPool()
    {
        join();
    }
    
    bool ThreadPool::submitTask(const Task& task)
    {
        std::lock_guard<:mutex> qLock(m_taskQueueMutex);
    
        if (m_taskQueue.size() == m_taskQueue.max_size())
        {
            return false;
        }
    
        m_taskQueue.push_back(task);
        m_taskQueueNotEmpty.notify_one();
        return true;
    }
    
    bool ThreadPool::submitTask(Task&& task)
    {
        std::lock_guard<:mutex> qLock(m_taskQueueMutex);
    
        if (m_taskQueue.size() == m_taskQueue.max_size())
        {
            return false;
        }
    
        m_taskQueue.emplace_back(std::move(task));
        m_taskQueueNotEmpty.notify_one();
        return true;
    }
    
    void ThreadPool::join()
    {
        m_exit.store(true);
        m_taskQueueNotEmpty.notify_all();
    
        for (auto&& t : m_threads)
        {
            t.join();
        }
    
        m_threads.clear();
    }
    
    void ThreadPool::runTasks()
    {
        Task task;
    
        while (true)
        {
            std::unique_lock<:mutex> qLock(m_taskQueueMutex);
    
            while (m_taskQueue.empty() && !m_exit)
            {
                m_taskQueueNotEmpty.wait(qLock);
            }
    
            if (m_taskQueue.empty())
            {
                return;
            }
    
            task = m_taskQueue.front();
            m_taskQueue.pop_front();
    
            qLock.unlock();
    
            task();
        }
    }

    メインプログラム:
    #include 
    #include 
    #include "ThreadPool.h"
    
    using namespace std::literals;
    
    static std::atomic_uint32_t count = 0;
    
    int main()
    {
        ThreadPool pool(2);
    
        int n = 1000;
        for (int i = 0; i < n; i++)
        {
            pool.submitTask([]()
                {
                    std::this_thread::sleep_for(1ns);
                    count++;
                });
        }
    
        pool.join();
    
        std::cout << "count = " << count << '
    '; }
    count = 1000