C++九陰真経のスレッドプール

4178 ワード

スレッドプールの役割:
1、短時間タスクの処理時にスレッドの作成と破棄の代価を回避する.
2、大量の同時任務の中で、システム資源をより合理的に使用し、ピークを削って谷を平らにし、システム運行の安定性を保証することができる.
C++11標準に基づいてスレッドプールを構築し、以下の利点を有する.
1、タスクはインタフェースを継承する必要がない
2、lambada式をサポートする
3、グローバル関数、静的メンバー関数をサポートする;
4、bindを使用してメンバー関数をサポートする.
 
class ThreadObject : public QueueObject
{
public:
    using QueueObject::QueueObject;
    ~ThreadObject()
    {
        Stop();
        for (std::thread& thread : m_pThreadPool) {
			if (thread.joinable())
				thread.join(); //       ,   :        
		}
    }
    int ThreadCount()
	{
		return m_pThreadPool.size();
	}
    //      
	int IdlCount() { return m_idlThrNum; }
protected:
    //      
	std::atomic  m_idlThrNum ;
    std::vector<:thread> m_pThreadPool;
};

//   
class CThreadPool : public ThreadObject
{
public:
	using ThreadObject::ThreadObject;
	//       
	void Start(int nThreadNum = 1)
	{
		m_idlThrNum = nThreadNum < 1 ? 1 : nThreadNum;
		for (auto size = 0; size < nThreadNum; ++size)
		{   //       
			m_pThreadPool.emplace_back(
				[this]
			{ //       
				while (!this->m_bStop)
				{
					std::function task;
					{   //          task
						// unique_lock    lock_guard     :     unlock()   lock()
						std::unique_lock<:mutex> lock(m_mu);

						this->m_condPop.wait(lock,
							[this] {
							return this->m_bStop.load() || !this->m_taskQueue.empty();
						}
						); // wait     task
						if (this->m_bStop && this->m_taskQueue.empty())
							return;
						task = std::move(this->m_taskQueue.front()); //     task
						this->m_taskQueue.pop();
					}
					//     
					m_condPush.notify_one();
					m_idlThrNum--;
					task();
					m_idlThrNum++;
				}
			}
			);
		}
	}
   
   //        
	template
	auto Commit(F&& f, Args&&... args) -> std::future
	{
		using return_type = decltype(f(args...));

		auto task = std::make_shared< std::packaged_task >(
			std::bind(std::forward(f), std::forward(args)...)
			);

		std::future res = task->get_future();
		{
			std::unique_lock<:mutex> lock(m_mu);

			//               
			if (m_bStop)
				throw std::runtime_error("            ");

			while (m_taskQueue.size() == m_nCapacity)         //    
			{
				m_condPush.wait(m_mu);                         //  ,      
			}

			m_taskQueue.emplace([task]() { (*task)(); });
		}
		m_condPop.notify_one();
		return res;
	}

	virtual size_t GetTaskNum()
	{
		return m_taskQueue.size();
	}
   
private:
	std::queue<:function>> m_taskQueue;                    //  
};

テストコード:
void Func1(int i, const std::string& msg)
{
	std::cout << i << "-->" << msg<< std::endl;
}

int Func2(int i, const std::string& msg)
{
	std::cout << i << "-->" << msg << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	return i * 100;
}

class PrintTest {
public:
	void Func1(int i, const std::string& msg)
	{
		std::cout << i << "-->" << msg << std::endl;
	}

	void Func2(int i)
	{
		std::cout << i << "-->" << "      " << std::endl;
	}
};

int main()
{
    CThreadPool pool;
	pool.Start(2);
	//  lambda   
	pool.Commit([] {std::cout << "  lambda   " << std::endl; });
	//      lambda   
	pool.Commit([](int val){std::cout << "      lambda   " << "-->" << val << std::endl; }, 999);
	//      
	pool.Commit(Func1, 100, "      ");
	//      
	PrintTest p;
	pool.Commit(std::mem_fn(&PrintTest::Func1),&p , 200, "      ");

	//        
	auto res = pool.Commit(Func2, 300, "        ");
	auto val = res.get();
	std::cout << "     :" << val << std::endl;

	std::this_thread::sleep_for(std::chrono::milliseconds(5000));

    return 0;
}