Apache Sparkで同じIDを1つにまとめる方法


はじめに

以下のようなユーザIDとアイテムIDの表に対して、Apache Sparkを利用して
同じユーザIDを持つアイテムIDをひとつにまとめるコードを書いていきます。

ユーザID アイテムID
1 B
2 B
1 C
3 C
2 B
3 A

同じユーザIDを持つアイテムIDをひとつにまとめると以下のようになります。

ユーザID アイテムID
1 B, C
2 B
3 A, C

Java、Scala、sbtのインストール

JavaとScalaとsbtをインストールします。
今回は以下のバージョンをインストールします。

$ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
$ scala -version
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
$ sbt about
[info] Set current project to App (in build file:/Users/)
[info] This is sbt 0.13.8
[info] The current project is {file:/Users/} 1.0
[info] The current project is built against Scala 2.11.6
[info] Available Plugins: sbt.plugins.IvyPlugin, sbt.plugins.JvmPlugin, sbt.plugins.CorePlugin, sbt.plugins.JUnitXmlReportPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.10.4

実装

以下の内容をbuild.sbtとして保存します。
今回はApache Spark 1.4.1を利用します。

build.sbt
name := "App"
version := "1.0"
scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1"
)

次に以下の内容をmain.scalaとして保存します。

main.scala
import org.apache.spark.SparkContext

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "App") // ローカルモード
    val rdd = sc.textFile("sample.csv").map(_.split(",")).map(x => (x(0), x(1))).cache()

    rdd.reduceByKey((x, y) => x + "," + y).collect.foreach { r => 
      println(r._1 + "\t" + r._2.split(',').distinct.mkString("", ",", "")) // 重複のあるアイテムIDを排除する
    }

    sc.stop()
  }
}

sc.textFile("sample.csv").map(_.split(",")) でファイルを指定してカンマで分割します。
map(x => (x(0), x(1)))(1, B)(2, B)というようなキーとバリューのペアにします。

次に以下の内容をsample.csvとして保存します。

sample.csv
1,B
2,B
1,C
3,C
2,B
3,A

実行

実行方法は以下のとおりです。

$ sbt run

大量の実行ログの中にprintlnの内容が表示されます。

2   B
3   C,A
1   B,C