python版mapreduceマトリクス乗算

23216 ワード

張先生のmapreduce行列を参考に乗算します.
転載はchybotからの学習ノートを明記してくださいhttp://i.cnblogs.com/EditPosts.aspx?postid=4541939
次はpythonバージョンのmapreduceマトリクスで乗算します.
マトリクス乗算の考え方は張先生のブログを参照してください.2つのマトリクスm 1とm 2について、mapreduceの計算過程は以下の通りです.
この中で最も主要な点はkeyの構成であり、map出力のkeyは乗算後のマトリクスの下付き文字であり、例えばc[i][j]=sum(A[i][:]*B[:][j])である.
注意:この実装知識マトリクスの乗算の1つの構想の実装は、実際のシーンに適していない.このmap taskは2(2つの入力マトリクスに対応するファイル)しかなく、reduce taskは1しかない.
主な理由は、mapプログラムごとにグローバル変数が使用されているためであり、reduceプログラムごとにデフォルトのマトリクス乗算結果に必要な値が1つのスライスにあるためです.
入力ファイル:
matrixA.txt
A#-1,0,2

A#1,3,1
matrixB.txt
B#3,1

B#2,1

B#1,0

maperプログラム:
#!/usr/bin/python

# -*-coding:utf-8 -*-



import sys



rowNum = 2

colNum = 2

rowIndexA = 1

rowIndexB = 1



def read_inputdata(splitstr):

    for line in sys.stdin:
#
yield line.split(splitstr) if __name__ == '__main__': for matrix, matrixline in read_inputdata('#'): if matrix == 'A':
# ( , ), key,value
for i in range(rowNum): key = str(rowIndexA) + ',' + str(i+1) value = matrix + ':' j = 1 for element in matrixline.split(','): print '%s\t%s%s,%s' % (key, value, j, element) j += 1 rowIndexA += 1 elif matrix == 'B': for i in range(colNum): value = matrix + ':' j = 1 for element in matrixline.split(','): print '%s,%s\t%s%s,%s' % (i+1, j, value, rowIndexB, element) j = j+1 rowIndexB += 1 else: continue

reduceプログラム:
#!/usr/bin/python

# -*- coding:utf-8 -*-



import sys

from itertools import groupby

from operator import itemgetter



def read_input(splitstr):

    for line in sys.stdin:

        line = line.strip()

        if len(line) == 0: continue

        yield line.split(splitstr, 1)





def run():

    data = read_input('\t')

    for current_element, group in groupby(data, itemgetter(0)):

        try:

            matrixA = {}

            matrixB = {}

            result = 0
# A b
for current_element, elements in group: matrix, index_value = elements.split(':') index, value = index_value.split(',') if matrix == 'A': matrixA[index] = int(value) else: matrixB[index] = int(value)
# , , mapreduce sort
for key in matrixA: result += matrixA[key]*matrixB[key] print '%s\t%s' % (current_element, result) except Exception: pass if __name__ == '__main__': run()

ローカルテストが実行可能かどうか:
bogon:program xuguoqiang$ cat matrixA.txt matrixB.txt |python matrix_mapper.py |sort |python matrix_reducer.py 

1,1    -1

1,2    -1

2,1    10

2,2    4

hadoop streamingを使用してmapredプログラムを実行します.結果は次のとおりです.
bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar  -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 \

> -mapper /Users/xuguoqiang/hadoop-1.2.1/program/matrix_mapper.py  \

> -reducer /Users/xuguoqiang/hadoop-1.2.1/program/matrix_reducer.py  \

> -input /matrix/* \

> -output output5

packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2547149142116420858/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob1502134034482177499.jar tmpDir=null

15/05/31 16:37:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/05/31 16:37:06 WARN snappy.LoadSnappy: Snappy native library not loaded

15/05/31 16:37:06 INFO mapred.FileInputFormat: Total input paths to process : 2

15/05/31 16:37:06 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]

15/05/31 16:37:06 INFO streaming.StreamJob: Running job: job_201505311232_0019

15/05/31 16:37:06 INFO streaming.StreamJob: To kill this job, run:

15/05/31 16:37:06 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0019

15/05/31 16:37:06 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0019

15/05/31 16:37:07 INFO streaming.StreamJob:  map 0%  reduce 0%

15/05/31 16:37:11 INFO streaming.StreamJob:  map 100%  reduce 0%

15/05/31 16:37:20 INFO streaming.StreamJob:  map 100%  reduce 100%

15/05/31 16:37:22 INFO streaming.StreamJob: Job complete: job_201505311232_0019

15/05/31 16:37:22 INFO streaming.StreamJob: Output: output5

bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop fs -cat output5/*

1,1    -1

1,2    -1

2,1    10

2,2    4

 
結果はローカルで実行した結果と同じであることがわかります.
 
二、疎行列乗算
疎行列は行列乗算の考え方と似ているが,前の行のデータを複数行に変えて体現しているにすぎない.
入力:
行列A
A#1,1,1

A#1,4,3

A#2,1,2

A#2,2,5

A#2,4,4

A#3,4,1

A#4,1,4

A#4,2,7

A#4,3,1

A#4,4,2

行列B
B#1,1,5

B#2,2,2

B#4,1,3

B#4,2,1

 
mapperプログラム:
#!/usr/bin/python

# -*-coding:utf-8 -*-



import sys



rowNum = 2

colNum = 4



def read_inputdata(splitstr):

    for line in sys.stdin:

        yield line.strip().split(splitstr)



if __name__ == '__main__':

    for matrix, matrixline in read_inputdata('#'):

        if matrix == 'A':

            for i in range(rowNum):

                index1, index2, element = matrixline.split(',')

                print '%s,%s\t%s:%s,%s' %  (index1, (i+1), matrix, index2, element)

        elif matrix == 'B':

            for i in range(colNum):

                index1, index2, element = matrixline.split(',')

                print '%s,%s\t%s:%s,%s' % (i+1, index2, matrix,index1, element)

        else: continue

reduceプログラム:
#!/usr/bin/python

# -*- coding:utf-8 -*-



import sys

from itertools import groupby

from operator import itemgetter



def read_input(splitstr):

    for line in sys.stdin:

        line = line.strip()

        if len(line) == 0: continue

        yield line.split(splitstr, 1)





def run():

    data = read_input('\t')

    for current_element, group in groupby(data, itemgetter(0)):

        try:

            matrixA = {}

            matrixB = {}

            result = 0

            for current_element, elements in group:

                matrix, index_value = elements.split(':')

                index, value = index_value.split(',')

                if matrix == 'A':

                    matrixA[index] = int(value)

                else:

                    matrixB[index] = int(value)

            for key in matrixA:

                if key in matrixB:

                    result += matrixA[key]*matrixB[key]

            print '%s\t%s' % (current_element, result)

        except Exception:

            pass



if __name__ == '__main__':

    run()

 
ローカルプログラムのテスト結果:
bogon:program xuguoqiang$ cat sparsematrixB.txt sparsematrixA.txt | python sparsematrix_mapper.py |sort |python sparsematrix_reduce.py 

1,1    14

1,2    3

2,1    22

2,2    14

3,1    3

3,2    1

4,1    26

4,2    16

hadoopテスト結果:
bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 -mapper /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_mapper.py  -reducer /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_reduce.py  -input /sparsematrix/* -output output

packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2334049571009138288/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob7964024689233782754.jar tmpDir=null

15/05/31 16:31:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/05/31 16:31:11 WARN snappy.LoadSnappy: Snappy native library not loaded

15/05/31 16:31:11 INFO mapred.FileInputFormat: Total input paths to process : 2

15/05/31 16:31:11 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]

15/05/31 16:31:11 INFO streaming.StreamJob: Running job: job_201505311232_0018

15/05/31 16:31:11 INFO streaming.StreamJob: To kill this job, run:

15/05/31 16:31:11 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0018

15/05/31 16:31:11 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0018

15/05/31 16:31:12 INFO streaming.StreamJob:  map 0%  reduce 0%

15/05/31 16:31:16 INFO streaming.StreamJob:  map 67%  reduce 0%

15/05/31 16:31:19 INFO streaming.StreamJob:  map 100%  reduce 0%

15/05/31 16:31:25 INFO streaming.StreamJob:  map 100%  reduce 33%

15/05/31 16:31:26 INFO streaming.StreamJob:  map 100%  reduce 100%

15/05/31 16:31:27 INFO streaming.StreamJob: Job complete: job_201505311232_0018

15/05/31 16:31:27 INFO streaming.StreamJob: Output: output

 
 
hadoopの勉強を始めたばかりで、頑張ってください.がんばれ!同じ道の人にアドバイスしてほしい.
 
参照先:
ファンログ:http://blog.fens.me/hadoop-mapreduce-matrix/