Google Compute Engine上でSparkアプリケーションを実行(Python3.X)


Spark入門で説明されている販売実績のデータを用いた集計処理を勉強がてらPythonで書き直してみる。
実行環境はGCE上でインスタンス4台とPython3.4.3を使用。

RDDの結合とブロードキャスト変数を使用。

0.準備

手元のマシンからGCEにサンプルデータを転送する。

$ gcloud compute --project PJ名 copy-files --zone ゾーン名ApacheSpark_samples.zip spark-client:/tmp

クラスタ環境でジョブを流すので、HDFSにデータを置く。

$ hdfs dfs -put Chapter5/
BestSellerFinder/           WordCountTop3/
QuestionnaireSummarization/ data/
WordCount/                  

$ hdfs dfs -put Chapter5/data/
README.md           questionnaire.csv   sales-october.csv
products.csv        sales-november.csv  

$ hdfs dfs -put Chapter5/data/*.csv /user/y_tadayasu/data/

$ hdfs dfs -ls /user/y_tadayasu/data
Found 4 items
-rw-r--r--   3 y_tadayasu y_tadayasu        643 2015-11-23 00:31 /user/y_tadayasu/data/products.csv
-rw-r--r--   3 y_tadayasu y_tadayasu        132 2015-11-23 00:31 /user/y_tadayasu/data/questionnaire.csv
-rw-r--r--   3 y_tadayasu y_tadayasu        906 2015-11-23 00:31 /user/y_tadayasu/data/sales-november.csv
-rw-r--r--   3 y_tadayasu y_tadayasu        937 2015-11-23 00:31 /user/y_tadayasu/data/sales-october.csv

1. 二ヶ月連続で50個以上売れた商品を見つけ、商品名、合計販売個数、売上高をファイルに保存

作成したコードは以下のとおりである。

# -*- coding:utf-8 -*-
from __future__ import print_function

import sys
import io
from operator import add

from pyspark import SparkContext

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage: examplent")
        exit(-1)

    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

   # print(sys.getdefaultencoding())
   # print(sys.stdout.encoding)

    fileOct = "hdfs:///user/y_tadayasu/data/sales-october.csv"
    fileNov = "hdfs:///user/y_tadayasu/data/sales-november.csv"
    fileProducts = "/home/y_tadayasu/data/products.csv"
    fileResult = "hdfs:///user/y_tadayasu/data/result.csv"

    sc = SparkContext(appName="Example")

    salesOctRDD = sc.textFile(fileOct).map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3]))).reduceByKey(add).filter(lambda x: x[1] >= 50)

    salesNovRDD = sc.textFile(fileNov).map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3]))).reduceByKey(add).filter(lambda x: x[1] >= 50)

    salesOctNovRDD = salesOctRDD.join(salesNovRDD)

    # A data of salesOctNovRDD element is like this tupple (8 (60,72)) 
    salesRDD = salesOctNovRDD.map(lambda x: (x[0],x[1][0]+x[1][1]))

    productsMap = {}
    for line in open(fileProducts, 'r',encoding='utf-8'):
        splitLine = line.split(',')
        productId = splitLine[0]
        productName = splitLine[1]
        unitPrice = int(splitLine[2])
        productsMap[productId] = (productName,unitPrice)

    broadcastMap = sc.broadcast(productsMap)

    result_list = []
    output = salesRDD.collect()
    for (productid, count) in output:
        result_list.append((broadcastMap.value[productid][0],count*productsMap[productid][1]))

    resultRDD = sc.parallelize(result_list)
    resultRDD.saveAsTextFile(fileResult)
    sc.stop()

1.1. CSVファイルからRDDを作成

SparkContextのtextFileメソッドを使用してCSVファイルからRDDを作成する。
しかし、textFileメソッドでそのまま取得してみると以下のようにカンマを含めて('5830,2014-10-02 10:20:38,16,28')が1つのデータとなり、このままではデータを扱いにくい。

>>> salesOctRDD = sc.textFile("/user/y_tadayasu/data/sales-october.csv")
>>> salesOctRDD.collect()
[u'5830,2014-10-02 10:20:38,16,28', u'5831,2014-10-02 15:13:04,15,22',.....

そこでカンマ区切りのデータを分割する。

>>> salesOctRDD = sc.textFile("/user/y_tadayasu/data/sales-october.csv").map(lambda x: x.split(","))
>>> salesOctRDD.collect()
[[u'5830', u'2014-10-02 10:20:38', u'16', u'28'], [u'5831', u'2014-10-02 15:13:04', u'15', u'22'], ........

トランザクションIDと販売日時は必要ないので、これらの項目は省く。

>>> salesOctRDD = sc.textFile("/user/y_tadayasu/data/sales-october.csv").map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3])))
>>> salesOctRDD.collect()
[(u'16', 28), (u'15', 22),.......

これで商品番号と販売個数のデータを取り出せたので、reduceByKeyメソッドを使って商品ごとの合計販売個数を求める。

>>> from operator import add
>>> salesOctRDD = sc.textFile("/user/y_tadayasu/data/sales-october.csv").map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3]))).reduceByKey(add)
>>> salesOctRDD.collect()
[(u'11', 21), (u'10', 65), (u'13', 29), (u'12', 26), (u'15', 80), (u'16', 28), (u'19', 18), (u'18', 14), (u'1', 8), (u'3', 20), (u'2', 34), (u'5', 4), (u'4', 5), (u'7', 7), (u'6', 16), (u'20', 19), (u'8', 68)]

50個以上の商品が対象となるので、filterメソッドを使用して、合計販売数が50以上のものだけを抽出する。

>>> salesOctRDD = sc.textFile(fileOct).map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3]))).reduceByKey(add).filter(lambda x: x[1] >= 50)
>>> salesOctRDD.collect()
[(u'15', 80), (u'10', 65), (u'8', 68)]

11月のデータに対しても同様の処理を行う。

>>>  salesNovRDD = sc.textFile(fileNov).map(lambda x: x.split(",")).map(lambda x:(x[2],int(x[3]))).reduceByKey(add).filter(lambda x: x[1] >= 50)
>>> salesNovRDD.collect()
[(u'15', 51), (u'8', 72)]

1.2. 2つのRDDを結合

joinメソッドを使って2つのRDD(10月と11月のRDD)を結合する。
ちなみにjoinメソッドはRDBでいう内部結合、外部結合のleftOuterJoinメソッド、rightOuterJoinメソッドも使える。

>>> salesOctNovRDD = salesOctRDD.join(salesNovRDD)
>>> salesOctNovRDD.collect()
[(u'15', (80, 51)), (u'8', (68, 72))]

商品番号をキーに、10月と11月の販売個数がvalueとして入っているので、mapメソッドを10月と11月の販売個数を加算する。

>>> salesRDD = salesOctNovRDD.map(lambda x: (x[0],x[1][0]+x[1][1]))
>>> salesOctNovSum.collect()
[(u'15', 131), (u'8', 140)]

1.3. ブロードキャスト変数を利用して商品名と商品番号のマップされたデータをエグゼキュータに配布する。

ここまでで10月、11月に50個以上売れた商品を見つける処理ができたが、商品番号が商品名とひも付けられてないので、何の商品が売れたのか分かりにくい。そこでproducts.csvに記録されているデータを読み込み、辞書オブジェクトに格納。それをbroadcastメソッドを使用し配布する。

#ローカルファイルからproducts.csvを読み込み、辞書オブジェクトに格納([productid]=productName)
productsMap = {}
for line in open(fileProducts, 'r',encoding='utf-8'):
    splitLine = line.split(',')
    productId = splitLine[0]
    productName = splitLine[1]
    unitPrice = int(splitLine[2])
    productsMap[productId] = (productName,unitPrice)

broadcastMap = sc.broadcast(productsMap)

valueメソッドを使用し、ブローキャスト変数から値を取り出す。

result_list = []
output = salesRDD.collect()
for (productid, count) in output:
    result_list.append((broadcastMap.value[productid][0],count*productsMap[productid][1]))

1.4. 集計結果をHDFS上に出力

SparkContextのsaveAsTextFileメソッドを使用して集計結果をファイルに出力する。ここでは辞書オブジェクトからRDDに変換するためにparallelizeメソッドを使用。

resultRDD = sc.parallelize(result_list)
    resultRDD.saveAsTextFile(fileResult)

ファイルに出力されているか確認。HDFS上に置いたのでファイルは分割されている。

$ hdfs dfs -cat /user/y_tadayasu/data/result.csv/part-00000
('栗もなか(10個入り)', 210000)
$ hdfs dfs -cat /user/y_tadayasu/data/result.csv/part-00001
('上生菓子(10個入り)', 222700)
$ hdfs dfs -cat /user/y_tadayasu/data/result.csv/part-0000*
('栗もなか(10個入り)', 210000)
('上生菓子(10個入り)', 222700)

2. はまった箇所

2.1. 日本語表示

問題)
PythonはUnicodeで文字を処理するのはしっているが、# -- coding:utf-8 --などを記載しても、Unicodeで表示され、読めない。。。

>>> productsMap = {}
>>> for line in open('products.csv', 'r'):
...    splitLine = line.split(',')
...    productId = splitLine[0]
...    productName = splitLine[1]
...    unitPrice = int(splitLine[2])
...    productsMap[productId] = (productName,unitPrice)
>>> broadcastVar = sc.broadcast(productsMap)
>>> broadcastVar.value
{'11': ('\xe6\xa0\x97\xe3\x81\xbe\xe3\x82\x93\xe3\x81\x97\xe3\x82\x99\xe3\x82\x85\xe3\x81\x86(5\xe5\x80\x8b\xe5\x85\xa5\xe3\x82\x8a)', '1400\n'),
・・・・・
 '10': ('\xe6\xa0\x97\xe3\x82\x82\xe3\x81\xaa\xe3\x81\x8b(10\xe5\x80\x8b\xe5\x85\xa5\xe3\x82\x8a)', '1500\n')}

解決方法)
以下のコードを埋め込んだところ、defaultencodingはutf-8になっているが、sys.stdout.encodingはutf-8になっていないのが判明。なおsys.stdoutは標準出力に対するファイルオブジェクトである。

print(sys.getdefaultencoding())
print(sys.stdout.encoding)

そこで以下の1行を追加し、sys.stdoutをutf-8に設定したところ、日本語が表示されることを確認。

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

2.2. YARNクラスタ環境におけるPyenvの使用

当初Pyenvを使用したPython3.4.3環境でコードを書いてテストしていたが、クラスタ環境で実行する際、yarnのNodeManagerがyarnユーザ権限でPYSPARK_PYTHONで指定されたpythonをキックするため、pyenvで一般ユーザの~.pyenv/xxx配下にあるpythonにアクセスできない。
仕方がたないので/usr/local/pythonにpython3をインストールしてとりあえず回避。(あまり綺麗なやり方ではないが、とりあえずこれでいいかな)

2.3. reduce処理時に発生した、PYTHONHASHSEEDの対応

問題)
reduceByKey(add)の箇所でエラーが発生

raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED

PYTHONHASHSEEDを無効にしろと言っている。
PYTHONHASHSEEDとは?
https://docs.python.org/3/using/cmdline.htmlに以下の記述あり。
PYTHONHASHSEED allows you to set a fixed value for the hash seed secret.
ハッシュを生成するためのシード。同じシードをずっと使うよりはランダムで生成する方がセキュリティを向上させることができるので追加されたようである。

以下のサイトに書いてあるように/root/.bashrcに書いたが解決せず、エラー内容は変わらない。

解決方法)
yarnクラスタを構成する全てのノードでyarn-env.shの中にPYTHONHASHSEED=0を設定すると、エラーがでなくなり、正常にジョブが終了することを確認。

/etc/hadoop/conf/yarn-env.sh
...
export PYTHONHASHSEED=0
...

今のバージョンではまだpython2.X系を使った方がよいかもしれない。