义齿
6962 ワード
# lixiang_list.txt( , map )
# ID
1800 003
1801 fw
1802 wtest 0524
1803 HW 01
1804 006
1805 007
1806 008
# order_list.txt ( )
# ID ID
18050411131170468193 18511745550 0.01 1525403591 1768 6502216546
18050411131741948542 18511745550 0.01 1525403597 1768 6502216546
18050411132051347006 18511745550 0.01 1525403600 1768 6502216546
18050411132624157487 18511745550 0.01 1525403606 1768 6502216546
#
ID ID
ジョブをコミットするときに、小さなテーブルファイル(lixiang_list.txt)ファイルをキャッシュ形式でHDFSにコミットし、各mapでlixiang_list.txtのデータをメモリにロードし、受信した大きなテーブルのデータから接続が必要かどうかを判断します.
#work.bash
#! /bin/bash
# map reduce
export WORK_PATH=/var/tmp/map_join
# stream jar
stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar
#
input=/lcy/map_join/input
#
output=/lcy/map_join/output
# mr
if $HADOOP_HOME/bin/hdfs dfs -test -d $output
then
$HADOOP_HOME/bin/hdfs dfs -rm -r $output
fi
# mapreduce
$HADOOP_HOME/bin/hadoop jar $stream \
-D mapreduce.job.reduces=3 \
-D num.key.fields.for.partition=1 \
-D stream.num.map.output.key.fields=1 \
-files $WORK_PATH/lixiang_list.txt \
-input $input/* \
-output $output \
-mapper "python mr.py mapper" \
-file $WORK_PATH/mr.py
mapreduceファイル
#! /usr/bin/env python
# coding:utf-8
import sys
import os
dct_lx = {}
def read_input(file,sepr=' '):
for line in file:
data = line.split(sepr)
yield data
def read_lx_data():
global dct_lx
with open('lixiang_list.txt','r') as f:
for line in f:
line = line.split(' ',1)
dct_lx[line[0].strip()] = line[1].strip()
def mapper():
global dct_lx
filepath = os.environ['map_input_file']
filename = os.path.split(filepath)[1]
lines = read_input(sys.stdin)
for data in lines:
if "order_list.txt" == filename:
if len(data) != 6:
continue
if data[4] in dct_lx:
print "%s\t%s\t%s\t%s\t%s\t%s\t%s" % (data[4],dct_lx[data[4]],data[0],data[2],data[1],data[5].strip('
'),data[3])
if __name__ == '__main__':
d = {"mapper":mapper}
if sys.argv[1] in d:
#
read_lx_data()
d[sys.argv[1]]()