Apache Sparkを利用したWord Count


はじめに

以下の文章についてApache Sparkを利用してWord Countを実行するコードを書いていきます。

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Java、Scala、sbtのインストール

JavaとScalaとsbtをインストールします。
今回は以下のバージョンをインストールします。

$ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
$ scala -version
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
$ sbt about
[info] Set current project to App (in build file:/Users/)
[info] This is sbt 0.13.8
[info] The current project is {file:/Users/} 1.0
[info] The current project is built against Scala 2.11.6
[info] Available Plugins: sbt.plugins.IvyPlugin, sbt.plugins.JvmPlugin, sbt.plugins.CorePlugin, sbt.plugins.JUnitXmlReportPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.10.4

実装1

以下の内容をbuild.sbtとして保存します。
今回はApache Spark 1.4.1を利用します。

build.sbt
name := "App"
version := "1.0"
scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1"
)

次に以下の内容をmain.scalaとして保存します。

main.scala
import org.apache.spark.SparkContext

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "App") // ローカルモード
    val rdd = sc.textFile("sample.txt")
      .flatMap(r => r.split(" "))
      .map(r => (r, 1))
      .cache
      .reduceByKey(_ + _)
      .collect.foreach { r => 
        println(r._1.mkString("", "", "") + "\t" + r._2)
      }

    sc.stop()
  }
}

flatMapは、要素を別の型に変換します。
次のmap(r => (r, 1)) で(Apache, 1) (Spark, 1) というペアにします。
reduceByKey(_ + _) でカウントアップします。
reduceByKey(_ + _) は reduceByKey((x, y) => x + y) と等価です。

以下の内容をsample.txtとして保存します。

sample.txt
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

実行1

実行方法は以下のとおりです。

$ sbt run

大量の実行ログの中にprintlnの内容が表示されます。

GraphX  1
Python  1
provides    1
is  1
R,  1
higher-level    1
general 1
fast    1
Java,   1
SQL 2
Apache  1
data    1
learning,   1
cluster 1
graph   1
execution   1
MLlib   1
Scala,  1
computing   1
supports    2
engine  1
set 1
rich    1
Streaming.  1
Spark   3
graphs. 1
general-purpose 1
APIs    1
that    1
a   2
high-level  1
including   1
optimized   1
in  1
system. 1
of  1
tools   1
also    1
structured  1
It  2
for 3
an  1
machine 1
and 5
processing, 2

実装2

次に複数行ある場合を考えます。
以下の内容をsample.txtとして保存します。

sample.txt
A
B
C
A
B
A

次に以下の内容をmain.scalaとして保存します。
実装1との違いはflatMapがなくなっただけです。

main.scala
import org.apache.spark.SparkContext

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "App") // ローカルモード
    val rdd = sc.textFile("sample.txt")
      .map(r => (r, 1))
      .cache
      .reduceByKey(_ + _)
      .collect.foreach { r => 
        println(r._1.mkString("", "", "") + "\t" + r._2)
      }

    sc.stop()
  }
}

実行2

実行結果は以下のとおりです。

B   2
A   3
C   1