hadoop上のC++プログラム開発

7942 ワード

hadoopはC++で開発でき、コマンド実行方式はpipes、例:hadoop pipes-conf job_config.xml -input input/myfile.txt -output output -program bin/wordcount
また、もう一つstreaming方式(?)
Javaプログラムを実行するには、jarパッケージとして打ってhadoop jarコマンドを使用します.「hadoop jarプログラム.jar mainclass arguments」
 
 
ネットの具体的な説明から引用します:
HCE, short for Hadoop c++ extension
従来のHadoopよりも効率を20%以上向上させることができ、数日後に逆インデックスで効率をテストする計画だという.暫定的に3台のノードを使用し,各ノード16コアcpu.
1日半hadoopとhceの導入を学び、CentOS 5.4で擬似分布式hceの導入に成功し、自分でコンパイルしたmapreduceプログラムwordcountを提出し、正しい結果を得た.
構成プロセスおよび問題:
hceソースをダウンロードすると、コンパイル中に次のエラーが発生します.
1.余分な名称限定:HCE:Compressor解決方法:コードから限定HCEを取り除く
コード位置:src/c+/hce/impl/compressor
2.シンボルが見つかりません:htons解決方法:参照のヘッダファイルを変更します.システム関連のヘッダファイル、すなわちlinux/ディレクトリの下を使用しないでください.include#includeコメント、#includeリンクを追加すると-lncursesが見つからないエラーが発生し、ncurses-develをインストールする必要があります.centosの場合はyumを使用してインストールできます.
コンパイルに成功した後にbuildディレクトリの下のいくつかのファイルを生成します
次に、実行フェーズの構成を行います.
conf/下のcore-siteを構成します.xml mapred-site.xml hdfs-site.xml
主に各サービスのIPアドレスとポートを構成し、hadoopの各サービスは構成されたアドレスで開きます.
運転段階では、あるdaemonを正常に起動できない現象が発生しやすく、ここではエラーの原因が多い可能性があります.煩雑ですが、保険的な方法をお勧めします.順番にサービスを開始します.
まずhdfs,bin/hadoop namenode-formatをフォーマットします
次にdaemonsを順番に起動し、hadoopは主に4つのdaemons:namenode、datanode、jobtracker、tasktrackerを含む.
順番に起動:
bin/hadoop-daemon start namenode
bin/hadoop-daemon start datanode
bin/hadoop-daemon start jobtracker
bin/hadoop-daemon start tasktracker
起動しながらlogsにログを表示して、起動に成功したかどうかを確認することができます.
起動に成功したらbin/hadoop fsシリーズコマンドを使用して、入力/出力ディレクトリinput/outputを確立し、入力ファイルをhdfsにアップロードします.
次に、c++版のmapreduceプログラムwordcountを作成します.コードは次のとおりです.
#include "hadoop/Hce.hh"class WordCountMap: public HCE::Mapper { public: HCE::TaskContext::Counter* inputWords; int64_t setup() { inputWords = getContext()->getCounter("WordCount", "Input Words"); return 0; } int64_t map(HCE::MapInput &input) { int64_t size = 0; const void* value = input.value(size); if ((size > 0) && (NULL != value)) { char* text = (char*)value; const int n = (int)size; for (int i = 0; i < n;) {//Skip past leading whitespace while ((i < n) && isspace(text[i])) i++;//Find word end int start = i; while ((i < n) && !isspace(text[i])) i++; if (start < i) { emit(text + start, i-start, "1", 1); getContext()->incrementCounter(inputWords, 1); } } } return 0; } int64_t cleanup() { return 0; } }; const int INT64_MAXLEN = 25; int64_t toInt64(const char *val) { int64_t result; char trash; int num = sscanf(val, "%ld%c", &result, &trash); return result; } class WordCountReduce: public HCE::Reducer { public: HCE::TaskContext::Counter* outputWords; int64_t setup() { outputWords = getContext()->getCounter("WordCount", "Output Words"); return 0; } int64_t reduce(HCE::ReduceInput &input) { int64_t keyLength; const void* key = input.key(keyLength); int64_t sum = 0; while (input.nextValue()) { int64_t valueLength; const void* value = input.value(valueLength); sum += toInt64((const char*)value); } char str[INT64_MAXLEN]; int str_len = snprintf(str, INT64_MAXLEN, "%ld", sum); getContext()->incrementCounter(outputWords, 1); emit(key, keyLength, str, str_len); } int64_t cleanup() { return 0; } }; int main(int argc, char *argv[]) { return HCE::runTask(//TemplateFactory sequence is Mapper, Reducer,//Partitioner, Combiner, Committer,//RecordReader, RecordWriter HCE::TemplateFactory void, void, void, void, void>() ); }
Makefileは次のとおりです.
HADOOP_HOME = ../hadoop-0.20.3/build JAVA_HOME = ../java6 INCLUDEDIR = ../hadoop-0.20.3/build/c++/Linux-amd64-64/include LIBDIR = ../hadoop-0.20.3/build/c++/Linux-amd64-64/lib CXX=g++ RM=rm -f INCLUDEDIR = -I${HADOOP_HOME}/c++/Linux-amd64-64/include LIBDIR = -L${HADOOP_HOME}/c++/Linux-amd64-64/lib\-L${JAVA_HOME}/jre/lib/amd64/server CXXFLAGS = ${INCLUDEDIR} -g -Wextra -Werror\-Wno-unused-parameter -Wformat\-Wconversion -Wdeprecated LDLIBS = ${LIBDIR} -lhce -lhdfs -ljvm all : wordcount-demo wordcount-demo : wordcount-demo.o $(CXX) -o $@ $^ $(LDLIBS) $(CXXFLAGS) clean: $(RM) *.o wordcount-demo
コンパイルに成功するとhceジョブをコミットできます.
bin/hadoop hce -input/input/test -output/output/out1 -program wordcount-demo -file wordcount-demo -numReduceTasks 1
ここで使用する入力ファイルinput/testの内容は以下の通りです.
The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog.
ジョブのコミット後にエラーが発生する可能性があります:job not successful
ログを表示します.次のエラーメッセージが表示されます.
stderr logs:
..........
HCE_FATAL 08-10 12:13:51 [/home/shengeng/hce/hadoop_hce_v1/hadoop-0.20.3/src/c++/hce/impl/MapRed/Hce.cc][176][runTask] error when parsing UgiInfo at /home/shengeng/hce/hadoop_hce_v1/hadoop-0.20.3/src/c++/hce/impl/MapRed/HadoopCommitter.cc:247  in virtual bool HCE::HadoopCommitter::needsTaskCommit() syslog logs: ....................... 
2011-08-10 12:13:51,450 ERROR org.apache.hadoop.mapred.hce.BinaryProtocol: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,450 ERROR org.apache.hadoop.mapred.hce.Application: Aborting because of java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,450 INFO org.apache.hadoop.mapred.hce.BinaryProtocol: Sent abort command 2011-08-10 12:13:51,496 WARN org.apache.hadoop.mapred.TaskTracker: Error running child java.io.IOException: hce child exception at org.apache.hadoop.mapred.hce.Application.abort(Application.java:325) at org.apache.hadoop.mapred.hce.HceMapRunner.run(HceMapRunner.java:87) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:369) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,500 INFO org.apache.hadoop.mapred.TaskRunner: Runnning cleanup for the task 
ログに基づいてコードにナビゲートするには、次の手順に従います.
 HadoopCommitter.cc ,
bool HadoopCommitter::needsTaskCommit()
string ugiInfo = taskContext->getJobConf()->get("hadoop.job.ugi"); //    hadoop.job.ugi          hce         
  words = HadoopUtils::splitString(ugiInfo, ",");
  HADOOP_ASSERT(words.size() ==2, "error when parsing UgiInfo"); //          
 hdfs-site.xml      :    hadoop.job.ugi   hadoop,supergroup 
         ,     hce     , needsTaskCommit()              ,       。