义齿

6962 ワード

  • 試験データ
  •     # lixiang_list.txt( , map )
        # ID  
        1800  003
        1801 fw 
        1802 wtest 0524
        1803 HW 01
        1804  006
        1805  007
        1806  008
    
    
        # order_list.txt ( )
        #                       ID   ID
        18050411131170468193 18511745550 0.01 1525403591 1768 6502216546
        18050411131741948542 18511745550 0.01 1525403597 1768 6502216546
        18050411132051347006 18511745550 0.01 1525403600 1768 6502216546
        18050411132624157487 18511745550 0.01 1525403606 1768 6502216546
    
    
        # 
         ID          ID  
  • 考え方
    ジョブをコミットするときに、小さなテーブルファイル(lixiang_list.txt)ファイルをキャッシュ形式でHDFSにコミットし、各mapでlixiang_list.txtのデータをメモリにロードし、受信した大きなテーブルのデータから接続が必要かどうかを判断します.
  • インスタンスコード:
    
    #work.bash
    
    
    #! /bin/bash
    
    
    
    # map reduce 
    
    export WORK_PATH=/var/tmp/map_join
    
    # stream jar 
    
    stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar
    
    # 
    
    input=/lcy/map_join/input
    
    # 
    
    output=/lcy/map_join/output
    
    
    # mr 
    
    if $HADOOP_HOME/bin/hdfs dfs -test -d $output
    then
        $HADOOP_HOME/bin/hdfs dfs -rm -r $output
    fi
    
    
    # mapreduce 
    
    $HADOOP_HOME/bin/hadoop jar $stream \
    -D mapreduce.job.reduces=3 \
    -D num.key.fields.for.partition=1  \
    -D stream.num.map.output.key.fields=1 \
    -files $WORK_PATH/lixiang_list.txt \
    -input $input/* \
    -output $output \
    -mapper "python mr.py mapper" \
    -file $WORK_PATH/mr.py
    mapreduceファイル
    
    #! /usr/bin/env python
    
    
    # coding:utf-8
    
    
    import sys
    import os
    
    dct_lx = {}
    
    def read_input(file,sepr=' '):
        for line in file:
            data = line.split(sepr)
            yield data
    
    
    def read_lx_data():
        global dct_lx
        with open('lixiang_list.txt','r') as f:
            for line in f:
                line = line.split(' ',1)
                dct_lx[line[0].strip()] = line[1].strip()
    
    def mapper():
        global dct_lx
        filepath = os.environ['map_input_file']
        filename = os.path.split(filepath)[1]
        lines = read_input(sys.stdin)
        for data in lines:
            if "order_list.txt" == filename:
                if len(data) != 6:
                    continue
                if data[4] in dct_lx:
                    print "%s\t%s\t%s\t%s\t%s\t%s\t%s" % (data[4],dct_lx[data[4]],data[0],data[2],data[1],data[5].strip('
    '
    ),data[3]) if __name__ == '__main__': d = {"mapper":mapper} if sys.argv[1] in d: # read_lx_data() d[sys.argv[1]]()