C++11学習ノート---非同期操作実行結果を取得

12490 ワード

マルチスレッド環境では、lambdaを渡しても関数ポインタを渡しても、std::threadに関数オブジェクトを渡しても、実行関数の戻り値を取得するのは難しい.以前は、結果をスレッド関数パラメータの一部として参照することで戻り値を保存するしかなかったが、あまり美しくない限界があった.C++11が導入したstd::futureはこの問題を効果的に解決できる.
std::futureはヘッダファイルに定義され、非同期操作の戻り値を取得するメカニズムを提供しますが、通常は以下の3つと組み合わせて使用されます.
  • std::promise
  • std::packaged_task
  • std::async

  • この3つの操作はそれぞれ異なりますが、get_が提供されているという共通点があります.futureインタフェースは、それに関連付けられたfutureを得るために使用され、ユーザ(プライマリスレッド)は、返されたfutureによって非同期動作結果を得ることができる.
    std::promise
    簡単に言えばpromiseはメッセージングのためのメカニズムであり、あるいは記憶値と異常を提供する施設である.スレッドを作成するとpromise参照をスレッド関数に渡すことができ、スレッド関数(非同期操作)でメインスレッドが望む結果を計算した後promise::set_value*などのインタフェース設定値(異常が発生した場合も異常を設定できます).プライマリスレッドはpromiseから取得したfutureで結果を取得できます
    例:std::futureとstd::promiseによる同時std::find関数の実装
    同時std::accumulateの実装と同様に、まず適切なスレッド数を計算し、所与の区間をいくつかのセル間に分割し、並列に検索操作を実行し、結果が見つかったらstd::promiseで検索結果を設定し、メインスレッドはstd::futureで結果を取得する
    #include 
    #include 
    #include 
    #include 
    #include 
    
    namespace parallel
    {
        template <class InputIt, class T>
        InputIt find(InputIt first, InputIt last, const T& value)
        {
            /* 
             *         
             * std::thread::hardware_concurrency()              
             */
            auto count = std::distance(first, last);
            auto avaThreadNums = std::thread::hardware_concurrency();
            auto perThreadMinNums = 20;
            auto maxThreadNums = ((count + (perThreadMinNums - 1)) & (~(perThreadMinNums - 1))) / perThreadMinNums;
            auto threadNums = 
                avaThreadNums == 0 ? 
                    maxThreadNums : 
                    std::min(static_cast<int>(maxThreadNums), static_cast<int>(avaThreadNums));
            auto blockSize = count / threadNums;
    
            /*      std::promise  ,           */
            std::promise result;
            /*            ,                    ,    done         */
            std::atomic<bool> done(false);
            {
                std::vector<std::thread> threads;
                auto front = first;
                for(int i = 0; i < threadNums; ++i)
                {
                    auto back = front;
                    if(i != threadNums - 1)
                        std::advance(back, blockSize);
                    else
                        back = last;
                    threads.emplace_back(
                                    [front, back, &value, &result, &done]
                                    {
                                        /*                 ,  done     */
                                        for(auto it = front; !done && it != back; ++it)
                                        {
                                            if(*it == value)
                                            {
                                                done.store(true);
                                                /*     ,       */
                                                result.set_value(it);
                                                return;
                                            }
                                        }
                                    }
                                );
                }
                /*        */
                for(auto &th : threads)
                    th.join();
            }
            /*   std::promise::get_future  std::future  ,    get     */
            return done ? result.get_future().get() : last;
        }
    }
    
    int main()
    {
        std::vector<int> v(100000000);
        int n = 0;
        std::generate(v.begin(), v.end(), [&n] { return ++n; }); 
        auto value = std::random_device()() % 65536;
        auto it1 = parallel::find(v.begin(), v.end(), value); 
        auto it2 = std::find(v.begin(), v.end(), value);
        assert(it1 == it2);
        return 0;
    }

    この例では複数のスレッドが同時にfind操作を実行し、最後に結果を見つけたスレッドが返す値を取得するだけで、どのスレッドが結果を見つけてもstd::promiseインスタンスに記録され、最終的にstd::futureで返されます.
    もちろんstd::promiseを使うのはスレッド関数に参照記録結果を伝えるのとほぼ同じですが、std::promiseの機能はこれに限らず、使いやすく、構造がより明確になります
    std::packaged_task
    std::packaged_taskは呼び出し可能なオブジェクトをパッケージするために使用され、関数ポインタ、関数オブジェクト、lambdaなどにほかならない.std::functionに似ているがpackaged_taskは、返されたfutureによって非同期動作の結果を取得することができる.
    例えば、ある関数が存在し、この関数が通常他のスレッドによって実行される場合、この関数の戻り値を取得することは困難であり、std::functionを例にとると、1つのスレッドプールでプライマリスレッドがstd::functionによって1つの関数をパッケージし、タスクキューに追加すると仮定する.その後、スレッドプール内の他のスレッドがこのタスク関数を取り出して実行を開始します.この場合、メインスレッドはこの関数の戻り値を取得することが困難です.代わりにstd::packaged_taskは違いますget_futureインタフェースstd::futureインスタンス、前述したようにstd::futureは非同期操作の結果を取得するために使用されるため、関数が誰によって実行されてもstd::future::getインタフェースによって戻り値を取得できます.
    例:stdの使用::packaged_taskはスレッドプールへのタスクの追加を実現
    紹介std::threadの1編では、スレッドプールの実装に関し、std::packaged_taskの理解では、タスクキューにタスクを追加する関数を再実装するとともに、呼び出し者がタスク関数の戻り結果を取得できることを確認する必要があります.ここでは、呼び出し者にstd::futureインスタンスを返すことができます.また、std::futureインスタンスを取得するには、関数パッケージに関連するstd::packaged_の3つの方法があります.taskなので、タスクを追加するときにpackaged_にタスク関数をパッケージします.taskでfutureを返します
    template <class F, class... Args>
    auto ThreadPool::enqueue(F&& f, Args... args)
            -> std::future<typename std::result_of::type>
    {
        /*     f     ,  std::future             */
        using return_type = typename std::result_of::type;
        /* std::packaged_task     ,        */
        /* std::bind()       ,   packaged_task  */
        auto task = std::make_shared<std::packaged_task>(
                        std::bind(std::forward(f), std::forward(args)...)
                    );
        /*   future,         */
        std::future result = task->get_future();
        {
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.push([task] { (*task)(); });
            cond_.notify_one();
        }
        /*   future */
        return result;
    }
    
    
    int main()
    {
        ThreadPool pool(4);
        std::vector<std::future<int>> results;
        for(int i = 0; i < 10; ++i)
        {
            results.emplace_back(
                        pool.enqueue(
                                [i]
                                {
                                    return i * i;
                                }
                            )
                        );
        }
        for(auto&& result : results)
          std::cout << result.get() << std::endl;
        return 0;
    }

    std::async
    std::asyncの場合、抽象はより深いと感じられます.std::asyncは、所与の関数を非同期で実行し、関数の戻り値を取得するためのstd::futureインスタンスを返します.したがってstd::asyncは本質的にスレッドを開いて所定の関数を実行するべきで、内部はstd::packaged_を採用する.taskは関数をパッケージし、std::futureに戻ります.
    std::asyncコンストラクション関数には非同期属性があります.それぞれ
  • std::launch::async、直ちに非同期評価
  • を開くことを示す
  • std::launch::deferred、遅延オープン、戻りfutureインスタンスがget関数を呼び出す場合にのみ非同期評価
  • をオープン
    デフォルトの非同期プロパティはstd::launch::async|std::launch::deferredなので、すぐに開くか遅延するかはコンパイラによって異なります.非同期が重要である場合は、コンストラクション関数でstd::launch::asyncを指定してください.
    例:std::asyncによるパラレルstd::for_each関数
    std::for_eachは、指定した区間の各要素に対して所定の関数を実行するので、完全に並列化できます.
    template <class InputIt, class UnaryFunction>
    UnaryFunction for_each(InputIt first, InputIt last, UnaryFunction f)
    {
        auto count = tinystl::distance(first, last);
        if(!count)  return f;
        if(count <= 100)
        {
            tinystl::for_each(first, last, f);
        }
        else
        {
            auto middle = first;
            tinystl::advance(middle, count / 2);
            /*              for_each */
            std::async(std::launch::async, tinystl::parallel::for_each, middle, last, f);
            /*            */
            tinystl::for_each(first, middle, f);
        }
        return f;
    }

    小結
    std::futureは非同期操作の実行結果を取得するメカニズムを提供し、std::promiseは値と異常を保存するために使用され、メッセージングの一種と見なすことができる.std::packaged_taskは呼び出し可能なオブジェクトの保証に使用されます.std::asyncは非同期操作を開きます.効果は、新しいスレッドの作成(または実行関数をスレッドプールに追加)と同じです.スレッド関数をパッケージし、futureインスタンスに戻ります.