Spark サンプルアプリケーション#1 WordCount and Word Ranking


WordCount

Sparkのパッケージに含まれているサンプルコードを基に何を行っているか確認する。

spark-1.5.2-bin-hadoop2.6/examples/src/main/python/wordcount.py
from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1).reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    sc.stop()

アプリを開発する際、データがどのように格納・処理されているか確認しながら進めていった方がよいので、PySparkを使って上記のサンプルコードを確認していく。

-SparkContextの生成

sc = SparkContext(appName="PythonWordCount")

SparkContextは、Sparkのアプリ全体の実行関する情報を集約したもの。Sparクラスタへの接続を行ったり、RDDを作成するために使用されたり、アキュムレートやブロードキャストなどの共用変数を提供する。

  • テキストファイルからRDDを生成
textRDD = sc.textFile("/opt/spark/README.md")

textFileメソッドを使用し、テキストファイルからRDDを生成する。
※RDDのパーティションサイズはファイルサイズを128MBで割った数近くになる。

※RDDの要素をフィルタリングしたい場合は、filterメソッドを使用する。filterメソッドは、RDDに含まれる要素にたいしてフィルタリング条件にマッチする要素のみを残したRDDを生成する。
ex) wordRDD = textRDD.filter(labda x: x.isalnum()) //英数字だけを取得

  • RDDの要素を加工
lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)

x.splitでwordに分解し、flatMapで各行のwordをflatten化して一つの大きな配列にする。mapメソッドにより、各要素を変換する。このプログラムではタプル(x,1)に変換。

  • RDDの要素をキー毎に集約
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1).reduceByKey(add)

reduceByKeyメソッドにより、RDDに含まれる要素を、同じキーを持つものにグループ分けし、グループ毎に値を集約する。
reduceByKeyメソッドが対象としている要素のデータ型は(Key,Value)。
reduceByKeyメソッドではキー毎の集約処理にシャッフルを行っているので、エグゼキュータ間の多対多の通信が発生する。

  • RDDの要素をキー毎に集約
output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

collectメソッドでRDDの要素を取り出し、printで出力する。

  • アプリケーションの停止
sc.stop()

stopメソッドを呼び出しアプリを停止する。

Appendix) WordCount ranking

どのwordが多く出現しているのかトップ10ランキングを表示する。
sortByKeyメソッドを利用することによりRDDに含まれる要素をソートすることができる。
(Key,Value)を要素に持つRDDが対象で、Keyによる大小に基づいてsortする。
そのためKeyが文字のままだとsortがアルファベット順になるので、(word,count)から(count,word)に変更する必要がある。
実際にランキングを表示してみたところ記号とかも入ってたので記号などは除外するようにした。

wordcountranking.py
from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' '))\
                  .filter(lambda x: x.isalnum())\
                  .map(lambda x: (x, 1))\
                  .reduceByKey(add)
    wordrankingRDD = counts.map(lambda (x,y): (y,x))
    sortedwordrankingRDD = wordrankingRDD.sortByKey(ascending=False)
    output = sortedwordrankingRDD.take(10) #要素を先頭から10個取り出す
    for (count, word) in output:
        print("%s: %i" % (word, count))

    sc.stop()
$ ./bin/spark-submit --master=local examples/src/main/python/wordranking.py /tmp/README.md
the: 21
to: 14
Spark: 14
for: 12
and: 10
a: 10
run: 7
on: 6
can: 6
is: 6