Boost.MapReduce

11639 ワード

Boost.を見てMapReduceはBoostと関係があると思いますか?実は以前Boostコードライブラリに置いたオープンソースソフトウェアにすぎません.Google MapReduceやHadoopとは異なる点がいくつかあります.
1.Googleが使用しているMapReduceと同様にC++で実現され、コアはテンプレートで実現されています.
2.共有メモリベースのMapReduceフレームワークで、さまざまなマルチコアプロセッサと共有メモリプロセッサで実行できます.ネットワークベースのMapReduceフレームワークではなく、複数のマシンで実行できません.マルチコアマシンのデータ処理フレームワークとして使用できます.著者らは,分散型のMapReduceとDFSの実現を計画した.
3.Phoenixとも呼ばれます.BSDプロトコルを使用しており、著作権はスタンフォード大学に属しています.
4.それはプラットフォームをまたいで、VC、GCCでコンパイルすることができます.Boostライブラリに依存します.著者らはこの依存を除去する計画だ.
興味があればhttp://www.craighenderson.co.uk/mapreduce/にアクセスします.
単語数のサンプルコード:
// Boost.MapReduce library
//
//  Copyright (C) 2009 Craig Henderson.
//  [email protected]
//
//  Use, modification and distribution is subject to the
//  Boost Software License, Version 1.0. (See accompanying
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// For more information, see http://www.boost.org/libs/mapreduce/
//

#define BOOST_DISABLE_ASSERTS 
#if !defined(_DEBUG) &&  !defined(BOOST_DISABLE_ASSERTS)
#   pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
#endif

// turn off checked iterators to avoid performance hit
#if defined(BOOST_MSVC)  &&  !defined(__SGI_STL_PORT)  &&  !defined(_DEBUG)
#define _SECURE_SCL 0
#endif

#include <boost/config.hpp>
#if defined(BOOST_MSVC)
#   pragma warning(disable: 4100 4127 4244 4512 4267 4996)
#endif

#include <boost/mapreduce.hpp>
#include <numeric>              // accumulate
#include <boost/algorithm/string.hpp>

#if defined(BOOST_MSVC)  &&  defined(_DEBUG)
#include <crtdbg.h>
#endif

namespace wordcount {

struct map_task : public boost::mapreduce::map_task<
                             std::string,                               // MapKey (filename)
                             std::pair<char const *, char const *> >    // MapValue (memory mapped file contents)
{
    template<typename Runtime>
    void operator()(Runtime &runtime, key_type const &/*key*/, value_type &value) const
    {
        bool in_word = false;
        char const *ptr = value.first;
        char const *end = value.second;
        char const *word = ptr;
        for (; ptr != end; ++ptr)
        {
            char const ch = std::toupper(*ptr);
            if (in_word)
            {
                if ((ch < 'A' || ch > 'Z') && ch != '\'')
                {
                    runtime.emit_intermediate(std::make_pair(word,ptr-word), 1);
                    in_word = false;
                }
            }
            else if (ch >= 'A'  &&  ch <= 'Z')
            {
                word = ptr;
                in_word = true;
            }
        }
        if (in_word)
            runtime.emit_intermediate(std::make_pair(word,ptr-word), 1);
    }
};

typedef std::pair<char const *, std::ptrdiff_t> reduce_key_t;

struct reduce_task : public boost::mapreduce::reduce_task<
                                reduce_key_t,
                                unsigned>
{
    template<typename Runtime, typename It>
    void operator()(Runtime &runtime, key_type const &key, It it, It const ite) const
    {
        runtime.emit(key, std::accumulate(it, ite, 0));
    }
};

class combiner
{
  public:
    template<typename IntermediateStore>
    static void run(IntermediateStore &intermediate_store)
    {
        combiner instance;
        intermediate_store.combine(instance);
    }

    void start(reduce_task::key_type const &)
    {
        total_ = 0;
    }

    template<typename IntermediateStore>
    void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
    {
        if (total_ > 0)
            intermediate_store.insert(key, total_);
    }

    void operator()(reduce_task::value_type const &value)
    {
        total_ += value;
    }
        
  private:
    combiner() { }

  private:
    unsigned total_;
};

typedef
boost::mapreduce::job<
    wordcount::map_task
  , wordcount::reduce_task
  , wordcount::combiner
> job;

}   // namespace wordcount


namespace std {

template<>
bool less<wordcount::reduce_key_t>::operator()(wordcount::reduce_key_t const &first, wordcount::reduce_key_t const &second) const
{
    std::ptrdiff_t const len = std::min(first.second, second.second);
#if defined(BOOST_MSVC)
    int const cmp = strnicmp(first.first, second.first, len);
#else
    int const cmp = strncasecmp(first.first, second.first, len);
#endif
    if (cmp < 0)
        return true;
    else if (cmp > 0)
        return false;

    return (first.second < second.second);
}

template<>
bool operator==(wordcount::reduce_key_t const &first, wordcount::reduce_key_t const &second)
{
    if (first.second != second.second)
        return false;
    else if (first.second == 0  &&  first.first == 0  &&  second.first == 0)
        return true;

#if defined(BOOST_MSVC)
    return (strnicmp(first.first, second.first, first.second) == 0);
#else
    return (strncasecmp(first.first, second.first, first.second) == 0);
#endif
}

}   // namespace std

namespace boost {
namespace mapreduce {
template<>
unsigned hash_partitioner::operator()(wordcount::reduce_key_t const &key, unsigned partitions) const
{
    return boost::hash_range(key.first, key.first+key.second) % partitions;
}
}   // namespace mapreduce {
}   // namespace boost {



int main(int argc, char **argv)
{
    std::cout << "MapReduce Wordcount Application";
    if (argc < 2)
    {
        std::cerr << "Usage: wordcount directory [num_map_tasks]
"; return 1; } boost::mapreduce::specification spec; spec.input_directory = argv[1]; std::cout << "
" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores"; std::cout << "
" << typeid(wordcount::job).name() << "
"; boost::mapreduce::results result; wordcount::job::datasource_type datasource(spec); try { if (argc > 2) spec.map_tasks = atoi(argv[2]); if (argc > 3) spec.reduce_tasks = atoi(argv[3]); else spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency()); std::cout << "
Running Parallel WordCount MapReduce..."; wordcount::job job(datasource, spec); job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result); std::cout << "
MapReduce Finished."; std::cout << std::endl << "
MapReduce statistics:"; std::cout << "
MapReduce job runtime : " << result.job_runtime << " seconds, of which..."; std::cout << "
Map phase runtime : " << result.map_runtime << " seconds"; std::cout << "
Reduce phase runtime : " << result.reduce_runtime << " seconds"; std::cout << "

Map:"; std::cout << "
Total Map keys : " << result.counters.map_keys_executed; std::cout << "
Map keys processed : " << result.counters.map_keys_completed; std::cout << "
Map key processing errors : " << result.counters.map_key_errors; std::cout << "
Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks; std::cout << "
Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds"; std::cout << "
Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds"; std::cout << "
Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds"; std::cout << "

Reduce:"; std::cout << "
Total Reduce keys : " << result.counters.reduce_keys_executed; std::cout << "
Reduce keys processed : " << result.counters.reduce_keys_completed; std::cout << "
Reduce key processing errors : " << result.counters.reduce_key_errors; std::cout << "
Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks; std::cout << "
Number of Result Files : " << result.counters.num_result_files; if (result.reduce_times.size() > 0) { std::cout << "
Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds"; std::cout << "
Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds"; std::cout << "
Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds"; } wordcount::job::const_result_iterator it = job.begin_results(); wordcount::job::const_result_iterator ite = job.end_results(); if (it != ite) { typedef std::list<wordcount::job::keyvalue_t> frequencies_t; frequencies_t frequencies; frequencies.push_back(*it); frequencies_t::reverse_iterator it_smallest = frequencies.rbegin(); for (++it; it!=ite; ++it) { if (frequencies.size() < 10) // show top 10 { frequencies.push_back(*it); if (it->second < it_smallest->second) it_smallest = frequencies.rbegin(); } else if (it->second > it_smallest->second) { *it_smallest = *it; it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>); } } frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>); std::cout << "

MapReduce results:"; for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq) printf("
%.*s\t%d", freq->first.second, freq->first.first, freq->second); } } catch (std::exception &e) { std::cout << std::endl << "Error: " << e.what(); } return 0; }