【Hadoop】c+&python Hadoop Streamingのパートナーとモジュール化を実現します.


回転:http://www.cppblog.com/MemoryGarden/archive/2010/01/24/106312.html
 
 
これらのものは私自身の理解です.もし間違ったところがあったら、どこで回り道をしましたか?私の間違いを指摘してください.ありがとうございます.Hadoop StreamingはJavaの実装クラスを作成する代わりに、実行可能なプログラムを利用してmap-reduceプロセスを完了するツールです.ワークフロー:InputFile->mappers-->[Partioner]--reducers-->OutputFilesは、リモートファイル系のシステムディレクトリを指定することができます.各mapperは実行可能ファイルであり、それに応じて一つのプロセスを起動して、あなたの論理3 mapperの入力を標準入力として実現します.ですから、標準入力の実行可能なものをサポートしています.c、c+++(コンパイルした実行可能ファイル)、python、…全部mapperとreducer mapperの出力を基準として出力します.もしないならば、その出力はreducerの入力4 Parttititionerとしてオプションの項目で、2回の並べ替え、結果を分類して結果ファイルの中に打つことができて、その入力はmapperの標準出力で、その出力はreducerの標準入力5 reducerとmapper 6出力フォルダで、遠端ファイルで名前を書き換えることができません.
Hadoop Streaming
hadoop-streamming.jarの位置:$HADOOP_HOME/contrib/streammingでは、公式上でhadoop-streeamingについて紹介されています.pythonの例もあります.ここで自分の経験をまとめてみます.mapper or reducerのtask公式上では-jobconfを使うと言っていますが、このパラメータはもう古いので、使ってはいけません.このDは最初の構成として出現します.Maperとreducerが実行される前に、硬性指定が必要なので、パラメータの一番前に表示します.しかし、まだあなたが指定したtaskの数の結果ファイルがあります.ただ、余分なのは空です.実験以下で分かります.2次の順序については、streamingを使っていますので、実行可能ファイル内では論理だけを処理して、出力もあります.もちろん、二次の順序を指定してもいいですが、全部のパラメータ化ですので、柔軟ではありません.例えば、10.2.3.40    111.22.33.33    1 www.renren.com 1 www.baidu.com    110.2.30    1のこのような立派な入力ファイルは、独立したipとurlのcountを記録する必要がありますが、出力ファイルを分割します.公式サイトの例としては、keyを指定してからkeyを指定して主とkeyを並べ替えます.主とkeyは二次並べ替えに使います.このようにして、あなたが欲しいものを出力しますが、上の一番簡単な需要に対して、パラメータを伝えるにはどうすればいいですか?実は私達はやはりこの点を利用して、私達のmapperの中で、やはり/tによってkey valueを分割しますが、私達はkeyにParttititionerに二次の並べ替えを実現するために指定します.だから私達はこのKEYを少し処理して、ipとurlの違いを簡単に判断できます.私たちは人為的なものに加えて、メイン-keyはmapperの中にあります.各keyの人為的なものに「ラベル」を付けて、partitionerに二回の並べ替え用をします.例えば、私たちのmapperの出力はこのようにD&10.2.3.40です.    1 D&11.22.33.33    1 W&wwww.ren.com 1 W&wwww.baidu.com    1 D&10.2.3.40    1そしてコマンドパラメータ-partitioner org.apacthe.hadoop.mapred.lib.KeyFieldBasedParttioner/指定要求を伝達することによって、二次ソート-jobconf map.output.key.field.separator='///ここで二つの引用符を付けないと私の命令はfidcontion/partnerを失います.間違いがないことを保証します.これで私達はpartitionerを通して二次順位を実現できます.reducerの中で、私達は更にラベルを外します.
モジュール化について
(   :         ,        )

               ,      , hadoop-streaming    ,       ,          
     -file(             )    ,          ,  ,           py  ,       
     mapper  import  ,       ,  -file            -file moudle/*     ,  streaming
         ,  ,                ,           

  :      chmod +x *.py   py       ,       
   : 
1:      mg.py     mapper    

def mgFunction(line):         if(line[0] >= '0' and line[0] <= '9'):                 return "D&" + line         return "W&" + line

2: mapper.py

#!/usr/bin/env python import sys sys.path.append('/home/liuguoqing/Desktop/hadoop-0.19.2/moudle') import mg for line in sys.stdin:         line = mg.mgFunction(line)         line = line.strip() #       print line         words = line.split()         print '%st%s' % (words[0], words[1])

3:reducer.py
ヽusr/bin/env pythonimport sysser_ロゴday={}for line in sys.stdin:        ライン=ライン[2]//帽子を取る        line=line.strip()        userid,day=line.split('t',1)        アメリカ.ロゴday[userid]=user_ロゴday.get(userid,0)+1 for uid in user_ロゴday.keys():        print'%st% d'(uid,userginudy[uid])
これにより、モジュール化された二次ソートが可能なhadoop-streeamingが実現され、以下のように命令されます.
   1: ./bin/hadoop jar hadoop-0.19.2-streaming.jar 
   2: #streaming jar
   3: -D mapred.reduce.tasks=2  
   4: #  2 reduce   
   5: -input user_login_day-input2/*  
   6: #           dir/*   
   7: -output user_login_day-output102 
   8: #       
   9: -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py  
  10: #  mapper            ,          ...
  11: -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py 
  12: #  reducer       
  13: -file ~/Desktop/hadoop-0.19.2/moudle/* 
  14: #          dir/*   
  15: -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
  16: #   partitioner    class
  17: -jobconf map.output.key.field.separator='&' 
  18: #    -key        '&'
  19: -jobconf num.key.fields.for.partition=1 
xiは最初の'&'に指定されています.liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-00.19.2$/bin/hadoop jar hadoop-019.2-streeaming.jar-D mapred.reduce.tass=2-input user_ロゴday-input 2/*-out put user uロゴday-output 12-mapper~/Desktop/hadoop-01.19.2/python/mapper/get_アメリカ.ロゴday_back.py-reducer~/Desktop/hadoop-019.2/python/reducer/get_アメリカ.ロゴday_back.py-file~/Desktop/hadoop-019.2/moude/*-partitioner org.apaache. hadoop.mapred.lib.KeyFieldBasedPatititititititiner-jobconf map.outppput.key.field.field.field.separator='''''jjoofifififiggggggggggggggggfifififififificonconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconated、please use-D instead.package JobJar:[/home/liugoqing/Deskytop/hadoop-00.19.2/moudle/mg.py,/home/liugong/Deskytop/hadoop-019.2/moudle/mg.pyc,/tmp/hadoop-liuguqing/hadoop-unjar//tmp/streamjob 3104001838751995.jar tmpDir=null 10/01 03:19 INFO mapred.FileInput Format:Total input paths to process:210/01/24 03:15 INFO streming.StreamJob:Logeltcal10/01/24 03:15 INFO streeaming.StreamJob:Running job:jobu 2010018_006510/01/24 03:15 INFO streeming.StreamJob:To kill this job、run:10/01/24:19 INFO stream/Joage  -Dmapred.job.trocer=hdfs://localhost:9881 -kill job_201001;201008;_006510/01/24 03:19 INFO streamJob:Tracking URL:http://localhost:50030/jobdetails.jsp?jobid=job_201001221008_006510/01/24 03:19:16 INFO streamJob:  map 0%  reduce 0%10/01/24 03:19 INFO streamJob:  map 33%  reduce 0%10/01/24 03:19 INFO streamJob:  map 67%  reduce 0%10/01/24 03:19 INFO streamJob:  map 100%  reduce 0%10/01/24 03:19 INFO streamJob:  map 100%  reduce 50%10/01/24 03:19 INFO streeaming.StreamJob:  map 100%  reduce 100%10/01/24 03:32 INFO streamJob:Job completter:job_201008;2010/01/24 03:32 INFO streming.StreamJob:Output:usergindy-output102liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-00.19.2ドル./bin/hadoop dfs-ls usero Hulogingudy-output 12 Found 3 items drwxr-xr-x   - liugoqing supergroup          0 2010-01-24 03:19/user/liuoqing/userginguday-output 12/_logs-rw-r--   1 liugoqing supergroup         25 2010-01-24 03:19/user/liuoqing/userginguday-output 12/part-0000-rw-r--   1 liugoqing supergroup         47 2010-01-24 03:19/user/liuoqing/usergingudy-output 12/part-00001liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-00.19.2ドル./bin/hadoop dfs-cat usergulogingudba-output 12/part-000054321    299999    112345    12liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-00.19.2ドル./bin/hadoop dfs-cat user loginguday-output 12/part-0001http://www.renren.com    3http://www.baidu.com    3以上は操作結果表示です.
4:c++の応用
二つの標準入出力のmapper reducerを書いてください.そして、g++mapper.cpp-o mapper+reducer.cpp-o reducer-o reducerが生成した二つの実行可能なmapper reducerのファイルをmapperとreducerパラメータとして作成すればいいです.実行するコマンドは上と同じです.コードは下の通りです.
   1: mapper.cpp
   2:  
   3: #include <stdio.h>
   4: #include <string>
   5: #include <iostream>
   6: using namespace std;
   7: int main(){
   8:         string key;
   9:         string value;
  10:         while(cin>>key){
  11:                 cin>>value;
  12:                 cout<<key<<"t"<<value<<endl;
  13:         }
  14:         return 0;
  15: }
  16:  
  17: reducer.cpp
  18:  
  19: #include <stdio.h>
  20: #include <string>
  21: #include <map>
  22: #include <iostream>
  23: using namespace std;
  24: int main(){
  25:         string key;
  26:         string value;
  27:         map<string, int> word2count;
  28:         map<string, int> :: iterator it;
  29:         while(cin>>key){
  30:                 cin>>value;
  31:                 it = word2count.find(key);
  32:                 if(it != word2count.end()){
  33:                         ++it->second;
  34:                 }
  35:                 else{
  36:                         word2count.insert(make_pair(key, 1));
  37:                         it->second = 0;
  38:                 }
  39:         }
  40:         for(it = word2count.begin(); it != word2count.end(); ++it){
  41:                 cout<<it->first<<"t"<<it->second<<endl;
  42:         }
  43:         return 0;
  44: }
  45:  
これでc++を利用してhadoop map-reduceを作成できます.