Sparkクイックスタート


まずはSparkを触ってみようということで、公式サイトのクイックスタートをその通りやったメモです。サンプルは全てPythonのみ実行してます。
http://spark.apache.org/docs/latest/quick-start.html

1. Interactive Analysis with the Spark Shell

1.1. Basics

SpkarkのシェルはAPIを学ぶのにシンプルな方法を提供し、インタラクティブでデータ解析する際に便利なツールである。ScalaやPythonで利用することが可能である。

> ./bin/pyspark
・・・
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Python version 2.6.9 (unknown, Mar 31 2014 22:15:20)
SparkContext available as sc, HiveContext available as sqlContext.
>>> 

初期の状態で実行するとINFOレベルの情報が出力されて出力結果が見難いので、以下の設定をした後、再度pysparkを実行する。

# cp conf/log4j.properties.template conf/log4j.properties
conf/log4j.properties
log4j.rootCategory=ERROR, console #INFOからERRORに変更

SparkはResilient Distributed Dataset(RDD)と呼ばれる抽象化したデータ構造を使う。RDDはHadoopのInfputFormats(HDFSファイルのような)から生成されたり、他のRDDから変換されることによって生成される。

Sparkのディレクトリに入っているREADMEファイルから新しいRDDを生成してみよう。
RDDは値を返す「アクション」と新しいRDDのポインタを返す「変換」持っている。

>>> textFile = sc.textFile("README.md")
>>> textFile.count() #RDDに含まれるアイテム数
98
>>> textFile.first() #RDDの最初のアイテム
u'# Apache Spark'
>>> 

filter変換を使って新しいRDDを生成する。

>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
>>> linesWithSpark.count()
18
>>> linesWithSpark.first()
u'# Apache Spark'

変換とアクションを以下のようにチェインすることができる。

>>> textFile.filter(lambda line: "Spark" in line).count()
18

1.2. More on RDD Operations

RDDのアクションと変換はより複雑な計算を行うことが可能である。
APIに関する詳細は以下を参照。
http://spark.apache.org/docs/latest/programming-guide.html
http://spark.apache.org/docs/latest/api/python/pyspark.html

以下を実行することにより、語数が一番多い行を見つけるには以下のを行を見つけることができる。

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a,b: a if (a > b) else b)
14

map関数は各要素に同じ処理を行い、新しいRDDを生成する。この例ではlambda式を利用して、各行をsplitで分割し、数を数えている。例えば、"This is a pen"なら"4"に変換している。
Reduceにより要素をまとめる。この例では、lambda式を利用して2つの要素(この例では行の数)を比較し、a > bならa、それ以外ならbを返す(リスト内包表記を使って記述。)処理を行い、最終的に一番大きな値を返す。

Hadoopによって人気になったMapReduceのデータフローもSparkは簡単に実現することができる。

各wordが何回含まれているかカウントするか求めるには、以下のように記述する。

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word:(word,1)).reduceByKey(lambda a,b: a+b)
>>> wordCounts.collect()
[(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), (u'using:', 1), (u'guidance', 3), (u'Scala,', 1), (u'environment', 1), (u'only', 1), (u'rich', 1), (u'Apache', 1),.......

flatMapはmapと似ているが、各インプットアイテムは0かそれ以上のアウトプットアイテムにmapされる。少し分かり難かったが、以下2つの記事を読み理解。flattenとmapを同時に行うのがflatMap。各要素を処理しながらflatten化する。
http://qiita.com/mtoyoshi/items/c95cc88de2910945c39d
http://d.hatena.ne.jp/xef/20121027/p2

mapとflatMapそれぞれのアウトプットであるRDDのデータを見てみる。

>>> aaa = textFile.map(lambda line: line.split())
>>> aaa.collect()
[[u'#', u'Apache', u'Spark'], [], [u'Spark', u'is', u'a', u'fast', u'and', u'general', u'cluster',...

mapでは1行1配列となっている。

>>> bbb = textFile.flatMap(lambda line: line.split())
>>> bbb.collect()
[u'#', u'Apache', u'Spark', u'Spark', .....]

flatMapではflattenされ、1つの配列となっていることを確認できる。
collectでRDDの全てのデータを配列として出力。

1.3. Caching

Sparkはクラスタ全体でインメモリキャッシュを利用することができ、これは繰り返しアクセスするデータに対して非常に便利。 10ノードとか100ノードとかででもストライプして利用できる。

>>> textFile = sc.textFile("README.md")
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
18
>>> linesWithSpark.count()
18

1.4. Self-Contained Applications

Python API (PySpark)を使って、'a'を含む行の数と'b'を含む行の数を数えて表示するシンプルなアプリを書く。

SimpleApp.py
from pyspark import SparkContext

logFile = "/opt/spark/README.md" 
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark-submitを使ってアプリを実行する。

$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23