Sparkを使用したアンケート集計 Python3で記述


NTTデータさんが書いたSpark入門の5.5章の問題をScalaではなく、Python3で書き直した際のメモ。以下の機能を使用してアンケート集計のSparkプログラムを記述。
・RDDの永続化(persist、cache)
・カウント(count)
・内部要素の合計値を計算(sum)
・RDDに含まれる要素すべてに対する集約処理(reduce)
・アキュムレータ(accumulator)

0. 準備

扱うデータ形式。このファイルをHDFS上に格納。

questionaire.csv
年齢、性別、評価

1. アンケート集計プログラム全体

評価を全体の平均、年代別の平均、性別ごとの平均の軸で集計するコードは以下のとおり。

chap5-2.py
# -*- coding:utf-8 -*-
from __future__ import print_function
import sys
import io
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage: examplent")
        exit(-1)
    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

    fileQ = "hdfs:///user/y_tadayasu/data/questionnaire.csv"
    sc = SparkContext(appName="chap5-2")
    questionRDD = sc.textFile(fileQ).map(lambda x: x.split(","))
    # RDD Persistence
    #questionRDD.persist()
    questionRDD.cache() # Persist this RDD with the default storage level (MEMORY_ONLY_SER).
    #######################################
    # draft-1: use count and sum method
    #######################################
    count = questionRDD.count()

    #totalPoints = questionRDD.map(lambda x: int(x[2])).sum()
    totalPoints = questionRDD.map(lambda x: int(x[2])).sum()

    print("AVZ=",totalPoints/count)

    #######################################
    # draft-2: use reduce
    #######################################
    (totalPoints,numQuestionare) = questionRDD.map(lambda x:(x[2],1)).reduce(lambda x,y:(int(x[0])+int(y[0]),int(x[1])+int(y[1])))
    print("AVZ=%f" %(totalPoints/numQuestionare))

    ageRDD = questionRDD.map(lambda x:(int(x[0])//10*10,(x[2],1))).reduceByKey(lambda x,y:(int(x[0])+int(y[0]),int(x[1])+int(y[1])))
    for i in ageRDD.collect():
       (age,(agePoint,ageCount)) = i
       print("AVG Age Range(%s):%.2f" %(age,agePoint/ageCount))

    #######################################
    # calculate average about male and female
    #######################################
    numMAcc = sc.accumulator(0)
    totalPointMAcc = sc.accumulator(0)
    numFAcc = sc.accumulator(0)
    totalPointFAcc = sc.accumulator(0)

    mfRDD = questionRDD.map(lambda x:(x[1],x[2])).foreach(lambda x: (numMAcc.add(1),totalPointMAcc.add(int(x[1]))) if x[0] == 'M' else (numFAcc.add(1),totalPointFAcc.add(int(x[1]))))
    print("AVG Male is %f" %(totalPointMAcc.value/numMAcc.value))
    print("AVG Female is %f" %(totalPointFAcc.value/numFAcc.value))
    sc.stop()

ジョブの実行。

$ spark-submit --master yarn-client chap5-2.py 
AVZ= 3.3684210526315788                                                         
AVZ=3.368421
AVG Age Range(40):2.50
AVG Age Range(10):4.00
AVG Age Range(20):3.71
AVG Age Range(50):1.50
AVG Age Range(30):3.50
AVG Male is 3.500000
AVG Female is 3.272727

2. プログラム説明

2.1.RDDの永続化

毎回csvからRDDファイルを生成するのは効率が悪いので、永続化することにより、その処理を効率化する(最初のジョブが実行されたとき、各パーティションがエグゼキュータ上に永続化する)。永続化レベルにより、永続化先をメモリ上やディスクなど選択できる。メモリだけに永続化する場合はpersistメソッドではなくcacheメソッドが使える。
使用可能な永続化レベルは以下を参照。
http://spark.apache.org/docs/latest/programming-guide.html

  fileQ = "hdfs:///user/y_tadayasu/data/questionnaire.csv"
    sc = SparkContext(appName="chap5-2")
    questionRDD = sc.textFile(fileQ).map(lambda x: x.split(","))
    # RDD Persistence
    # persistメソッドで永続化できるが、今回はメモリ上だけに永続化するのでcacheメソッドを使用
    #questionRDD.persist()
    questionRDD.cache() 

2.2. 要素数のカウントと集計 countメソッド、sumメソッド

countメソッドによりRDDに含まれる要素数の数を取得するアクション。

sumメソッドにより数値型の要素を持つRDDを対象に、要素の合計値を計算するアクション。

    count = questionRDD.count()

    #totalPoints = questionRDD.map(lambda x: int(x[2])).sum()
    totalPoints = questionRDD.map(lambda x: int(x[2])).sum()

    print("AVZ=",totalPoints/count)

2.3. 要素数のカウントと集計 reduceメソッド、foreachメソッド

mapメソッドを使ってRDDの全ての要素を、(評価,1)のタプルに変換し、reduceメソッドを使って評価の合計値と評価数の合計値を集計する。

(totalPoints,numQuestionare) = questionRDD.map(lambda x:(x[2],1)).reduce(lambda x,y:(int(x[0])+int(y[0]),int(x[1])+int(y[1])))
print("AVZ=%f" %(totalPoints/numQuestionare))

各年代別の評価に関しては、(年代、(評価,1))のタプルに変換し、reduceByKeyメソッドを使って年代毎の評価の合計値と評価数の合計値を集計する。

ageRDD = questionRDD.map(lambda x:(int(x[0])//10*10,(x[2],1))).reduceByKey(lambda x,y:(int(x[0])+int(y[0]),int(x[1])+int(y[1])))
for i in ageRDD.collect():
    (age,(agePoint,ageCount)) = i
    print("AVG Age Range(%s):%.2f" %(age,agePoint/ageCount))

reduceメソッドとforeachメソッドについてはslideshareにまとめる予定。
reduceメソッドを使う際のデータ処理に関しては以下のようにして確認。
タプルなので単に加算すると文字の連結になるので、intに変換する必要あり。

>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv")
>>> txtRDD.collect()
['1,2', '3,4', '5,6', '7,8', '9,10']
>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv").map(lambda x: x.split(","))
>>> txtRDD.collect()
[['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9', '10']]

>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv").map(lambda x: x.split(",")).reduce(lambda x,y:x)
>>> print(txtRDD)
['1', '2']

>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv").map(lambda x: x.split(",")).reduce(lambda x,y:y)
>>> print(txtRDD)
['9', '10']

>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv").map(lambda x: x.split(",")).reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]))
>>> print(txtRDD)
('13579', '246810')
>>> txtRDD = sc.textFile("hdfs:///user/y_tadayasu/data/aaa.csv").map(lambda x: x.split(",")).reduce(lambda x,y:(int(x[0])+int(y[0]),int(x[1])+int(y[1])))
>>> print(txtRDD)
(25, 30)

2.4. アキュムレータを使ってカウント

アキュムレータはドライバプログラムでは値の設定と参照を行い、エグゼキュータ上で実行されるタスクからは値の加算のみを行うことを想定した共有変数。何かの値をカウントするためだけに容易された仕組みだと思われる。
accumulatorメソッドを使用し、アキュムレータを使用。1つ目の引数は初期値。

性別がMならnumMAcc(男性の数)に1を加算し、totalPointMAcc(男性の総ポイント)に評価をする。M以外の場合は女性の方に同じ処理を行う。

numMAcc = sc.accumulator(0)
totalPointMAcc = sc.accumulator(0)
numFAcc = sc.accumulator(0)
totalPointFAcc = sc.accumulator(0)

mfRDD = questionRDD.map(lambda x:(x[1],x[2])).foreach(lambda x: (numMAcc.add(1),totalPointMAcc.add(int(x[1]))) if x[0] == 'M' else (numFAcc.add(1),totalPointFAcc.add(int(x[1]))))

print("AVG Male is %f" %(totalPointMAcc.value/numMAcc.value))
print("AVG Female is %f" %(totalPointFAcc.value/numFAcc.value))

3. トラブルシューティング

デバッグも兼ねて何回かジョブを実行しようとしたら、以下のエラーメッセージが出力されジョブが失敗するようになった。

5/12/01 00:42:56 ERROR SparkContext: Error initializing SparkContext.
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/y_tadayasu/.sparkStaging/application_1448
929618763_0004/spark-assembly-1.5.2-hadoop2.6.0.jar could only be replicated to 0 nodes instead of minReplicatio
n (=1).  There are 2 datanode(s) running and no node(s) are excluded in this operation.

調べてみるとHDFSを構成しているディスク容量が100%近くになっていた。どの領域がネックになっているか調べていくとDataNodeのファイルキャッシュディレクトリが容量を逼迫させていたので、とりあえず削除。HDFSをちゃんと運用するために最大値を設定するなどした方が良さそう。
/hadoop/yarn/node-manager/local/usercache/y_tadayasu/filecache