Apache Sparkとpythonでワードカウント(Mac OSX)


概要

Apache Sparkの検証の第一歩として。
Hadoop経験者であればよくご存知かと思いますが、ファイル内の同一の語句をカウントするアレです。
環境はMac OSXですが、Linuxでもほぼ同じかと。
コード一式はこちら

インストール

$ brew install apache-spark

インストール確認

spark-shellが動いてscala>が表示されればOK

$ /usr/local/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:51 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:51 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:56 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:56 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala>

pythonでローカルファイルのワードカウントをしてみる

こちらは、公式サイトの記載を参考に書きました。

ディレクトリ構成

下記のように準備してください。

$ tree
.
├── input
│   └── data # 読み込むテキスト
└── wordcount.py # 実行スクリプト

1 directory, 4 files

コードを書く

ここではpythonを使います。
scalaやJavaでも書けるよう。得意なのでいきましょう。
こんな感じ。

wordcount.py
#!/usr/bin/env python
# coding:utf-8

from pyspark import SparkContext

def execute(sc, src, dest):
    '''
    ワードカウントを実行する
    '''
    # srcファイルを読み込み
    text_file = sc.textFile(src)
    counts = text_file.flatMap(lambda line: line.split(" ")) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)
    # 結果を書き出し
    counts.saveAsTextFile(dest)

if __name__ == '__main__':
    sc = SparkContext('local', 'WordCount')
    src  = './input'
    dest = './output'
    execute(sc, src, dest)

読み込みファイル準備

適当に。
例えばこんな感じ。

aaa
bbb
ccc
aaa
bbbb
ccc
aaa

実行

下記コマンド。

$ which pyspark
/usr/local/bin/pyspark

# 実行
$ pyspark ./wordcount.py

実行するとダーッとログが流れます。(Hadoop Streamingみたい)

確認

(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)

正しくカウントされました。

おまけ

出力先ディレクトリ(./output)が生成済の場合、次回の処理に失敗するので注意。
下記のようなシェルを同じディレクトリに添えておくと良いです。

exec.sh
#!/bin/bash

rm -fR ./output
/usr/local/bin/pyspark ./wordcount.py

echo ">>>>> result"
cat ./output/*
$ sh exec.sh
・・・
>>>>> result
(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)