python版mapreduceマトリクス乗算
23216 ワード
張先生のmapreduce行列を参考に乗算します.
転載はchybotからの学習ノートを明記してくださいhttp://i.cnblogs.com/EditPosts.aspx?postid=4541939
次はpythonバージョンのmapreduceマトリクスで乗算します.
マトリクス乗算の考え方は張先生のブログを参照してください.2つのマトリクスm 1とm 2について、mapreduceの計算過程は以下の通りです.
この中で最も主要な点はkeyの構成であり、map出力のkeyは乗算後のマトリクスの下付き文字であり、例えばc[i][j]=sum(A[i][:]*B[:][j])である.
注意:この実装知識マトリクスの乗算の1つの構想の実装は、実際のシーンに適していない.このmap taskは2(2つの入力マトリクスに対応するファイル)しかなく、reduce taskは1しかない.
主な理由は、mapプログラムごとにグローバル変数が使用されているためであり、reduceプログラムごとにデフォルトのマトリクス乗算結果に必要な値が1つのスライスにあるためです.
入力ファイル:
maperプログラム:
reduceプログラム:
ローカルテストが実行可能かどうか:
hadoop streamingを使用してmapredプログラムを実行します.結果は次のとおりです.
結果はローカルで実行した結果と同じであることがわかります.
二、疎行列乗算
疎行列は行列乗算の考え方と似ているが,前の行のデータを複数行に変えて体現しているにすぎない.
入力:
行列A
行列B
mapperプログラム:
reduceプログラム:
ローカルプログラムのテスト結果:
hadoopテスト結果:
hadoopの勉強を始めたばかりで、頑張ってください.がんばれ!同じ道の人にアドバイスしてほしい.
参照先:
ファンログ:http://blog.fens.me/hadoop-mapreduce-matrix/
転載はchybotからの学習ノートを明記してくださいhttp://i.cnblogs.com/EditPosts.aspx?postid=4541939
次はpythonバージョンのmapreduceマトリクスで乗算します.
マトリクス乗算の考え方は張先生のブログを参照してください.2つのマトリクスm 1とm 2について、mapreduceの計算過程は以下の通りです.
この中で最も主要な点はkeyの構成であり、map出力のkeyは乗算後のマトリクスの下付き文字であり、例えばc[i][j]=sum(A[i][:]*B[:][j])である.
注意:この実装知識マトリクスの乗算の1つの構想の実装は、実際のシーンに適していない.このmap taskは2(2つの入力マトリクスに対応するファイル)しかなく、reduce taskは1しかない.
主な理由は、mapプログラムごとにグローバル変数が使用されているためであり、reduceプログラムごとにデフォルトのマトリクス乗算結果に必要な値が1つのスライスにあるためです.
入力ファイル:
matrixA.txt
A#-1,0,2
A#1,3,1
matrixB.txt
B#3,1
B#2,1
B#1,0
maperプログラム:
#!/usr/bin/python
# -*-coding:utf-8 -*-
import sys
rowNum = 2
colNum = 2
rowIndexA = 1
rowIndexB = 1
def read_inputdata(splitstr):
for line in sys.stdin:
#
yield line.split(splitstr)
if __name__ == '__main__':
for matrix, matrixline in read_inputdata('#'):
if matrix == 'A':
# ( , ), key,value
for i in range(rowNum):
key = str(rowIndexA) + ',' + str(i+1)
value = matrix + ':'
j = 1
for element in matrixline.split(','):
print '%s\t%s%s,%s' % (key, value, j, element)
j += 1
rowIndexA += 1
elif matrix == 'B':
for i in range(colNum):
value = matrix + ':'
j = 1
for element in matrixline.split(','):
print '%s,%s\t%s%s,%s' % (i+1, j, value, rowIndexB, element)
j = j+1
rowIndexB += 1
else: continue
reduceプログラム:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import sys
from itertools import groupby
from operator import itemgetter
def read_input(splitstr):
for line in sys.stdin:
line = line.strip()
if len(line) == 0: continue
yield line.split(splitstr, 1)
def run():
data = read_input('\t')
for current_element, group in groupby(data, itemgetter(0)):
try:
matrixA = {}
matrixB = {}
result = 0
# A b
for current_element, elements in group:
matrix, index_value = elements.split(':')
index, value = index_value.split(',')
if matrix == 'A':
matrixA[index] = int(value)
else:
matrixB[index] = int(value)
# , , mapreduce sort
for key in matrixA:
result += matrixA[key]*matrixB[key]
print '%s\t%s' % (current_element, result)
except Exception:
pass
if __name__ == '__main__':
run()
ローカルテストが実行可能かどうか:
bogon:program xuguoqiang$ cat matrixA.txt matrixB.txt |python matrix_mapper.py |sort |python matrix_reducer.py
1,1 -1
1,2 -1
2,1 10
2,2 4
hadoop streamingを使用してmapredプログラムを実行します.結果は次のとおりです.
bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 \
> -mapper /Users/xuguoqiang/hadoop-1.2.1/program/matrix_mapper.py \
> -reducer /Users/xuguoqiang/hadoop-1.2.1/program/matrix_reducer.py \
> -input /matrix/* \
> -output output5
packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2547149142116420858/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob1502134034482177499.jar tmpDir=null
15/05/31 16:37:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/31 16:37:06 WARN snappy.LoadSnappy: Snappy native library not loaded
15/05/31 16:37:06 INFO mapred.FileInputFormat: Total input paths to process : 2
15/05/31 16:37:06 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]
15/05/31 16:37:06 INFO streaming.StreamJob: Running job: job_201505311232_0019
15/05/31 16:37:06 INFO streaming.StreamJob: To kill this job, run:
15/05/31 16:37:06 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0019
15/05/31 16:37:06 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0019
15/05/31 16:37:07 INFO streaming.StreamJob: map 0% reduce 0%
15/05/31 16:37:11 INFO streaming.StreamJob: map 100% reduce 0%
15/05/31 16:37:20 INFO streaming.StreamJob: map 100% reduce 100%
15/05/31 16:37:22 INFO streaming.StreamJob: Job complete: job_201505311232_0019
15/05/31 16:37:22 INFO streaming.StreamJob: Output: output5
bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop fs -cat output5/*
1,1 -1
1,2 -1
2,1 10
2,2 4
結果はローカルで実行した結果と同じであることがわかります.
二、疎行列乗算
疎行列は行列乗算の考え方と似ているが,前の行のデータを複数行に変えて体現しているにすぎない.
入力:
行列A
A#1,1,1
A#1,4,3
A#2,1,2
A#2,2,5
A#2,4,4
A#3,4,1
A#4,1,4
A#4,2,7
A#4,3,1
A#4,4,2
行列B
B#1,1,5
B#2,2,2
B#4,1,3
B#4,2,1
mapperプログラム:
#!/usr/bin/python
# -*-coding:utf-8 -*-
import sys
rowNum = 2
colNum = 4
def read_inputdata(splitstr):
for line in sys.stdin:
yield line.strip().split(splitstr)
if __name__ == '__main__':
for matrix, matrixline in read_inputdata('#'):
if matrix == 'A':
for i in range(rowNum):
index1, index2, element = matrixline.split(',')
print '%s,%s\t%s:%s,%s' % (index1, (i+1), matrix, index2, element)
elif matrix == 'B':
for i in range(colNum):
index1, index2, element = matrixline.split(',')
print '%s,%s\t%s:%s,%s' % (i+1, index2, matrix,index1, element)
else: continue
reduceプログラム:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import sys
from itertools import groupby
from operator import itemgetter
def read_input(splitstr):
for line in sys.stdin:
line = line.strip()
if len(line) == 0: continue
yield line.split(splitstr, 1)
def run():
data = read_input('\t')
for current_element, group in groupby(data, itemgetter(0)):
try:
matrixA = {}
matrixB = {}
result = 0
for current_element, elements in group:
matrix, index_value = elements.split(':')
index, value = index_value.split(',')
if matrix == 'A':
matrixA[index] = int(value)
else:
matrixB[index] = int(value)
for key in matrixA:
if key in matrixB:
result += matrixA[key]*matrixB[key]
print '%s\t%s' % (current_element, result)
except Exception:
pass
if __name__ == '__main__':
run()
ローカルプログラムのテスト結果:
bogon:program xuguoqiang$ cat sparsematrixB.txt sparsematrixA.txt | python sparsematrix_mapper.py |sort |python sparsematrix_reduce.py
1,1 14
1,2 3
2,1 22
2,2 14
3,1 3
3,2 1
4,1 26
4,2 16
hadoopテスト結果:
bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 -mapper /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_mapper.py -reducer /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_reduce.py -input /sparsematrix/* -output output
packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2334049571009138288/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob7964024689233782754.jar tmpDir=null
15/05/31 16:31:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/31 16:31:11 WARN snappy.LoadSnappy: Snappy native library not loaded
15/05/31 16:31:11 INFO mapred.FileInputFormat: Total input paths to process : 2
15/05/31 16:31:11 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]
15/05/31 16:31:11 INFO streaming.StreamJob: Running job: job_201505311232_0018
15/05/31 16:31:11 INFO streaming.StreamJob: To kill this job, run:
15/05/31 16:31:11 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0018
15/05/31 16:31:11 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0018
15/05/31 16:31:12 INFO streaming.StreamJob: map 0% reduce 0%
15/05/31 16:31:16 INFO streaming.StreamJob: map 67% reduce 0%
15/05/31 16:31:19 INFO streaming.StreamJob: map 100% reduce 0%
15/05/31 16:31:25 INFO streaming.StreamJob: map 100% reduce 33%
15/05/31 16:31:26 INFO streaming.StreamJob: map 100% reduce 100%
15/05/31 16:31:27 INFO streaming.StreamJob: Job complete: job_201505311232_0018
15/05/31 16:31:27 INFO streaming.StreamJob: Output: output
hadoopの勉強を始めたばかりで、頑張ってください.がんばれ!同じ道の人にアドバイスしてほしい.
参照先:
ファンログ:http://blog.fens.me/hadoop-mapreduce-matrix/