はじめてのSpark


導入から実行まで

1.scalaをinstallする

brew install scala

2.sparkをclone

git clone https://github.com/apache/spark.git

3.spark直下へ移動

cd spark 

4.versionを1.3.1に変更

git checkout -b version2 v1.3.1

5.mvnを使用し、install

mvn -DskipTests clean package

6.work/logs/test.txtにテストデータを作成

7.spark-shellを実行

macbook:spark user$ ./bin/spark-shell 

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/

8.ローカルのファイルからRDDを生成

scala> val file = sc.textFile("work/logs/test.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

9.user1というカラムでmaping

scala> val filter = file.filter(_.contains("user1"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23

10.print処理

scala>  println(filter.count)
2

spark-submitを使用しstandaloneを作成

1.installしたsparkにpathを通す

echo 'alias spark-shell="~/work/spark/bin/spark-shell"' >>~/.bash_aliases
echo 'alias spark-submit="~/work/spark/bin/spark-submit"'>>~/.bash_aliases
source ~/.bash_alias

2.sbtをinstall

brew install sbt

3.file構成

$ mkdir ~/work
$ cd work
$ mkdir spark
$ mkdir spark/src
$ mkdir spark/src/main
$ mkdir spark/src/main/scala
$ mkdir spark/project
$ mkdir spark/log
$ vi spark/log/access_log #access_logは適宜用意
$ mkdir spark/target

4.build.sbtを作成 ※1

build.sbt
//build-version
version := "0.1" 

//scalaVersion
scalaVersion := "2.11.7" 

5.依存ライブラリのinstall
libraryDependencies ++= Seq(
   "org.xerial" % "sqlite-jdbc" % "3.7.2",
   "org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
   //"org.apache.spark" %% "spark-streaming" % "1.3.1"
   //"org.apache.spark" %% "spark-streaming-kafka" % "1.3.1"
)

//※3 重複防止
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

6.plugin.sbtを作成

plugin.sbt
logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

7.access_logをParseし件数を取得(Example.scala)

Example.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//import org.apache.spark.streaming.{Time, Seconds, StreamingContext}

object ExampleApp {
  def main(args: Array[String]) {
    //用意したaccess_log
    val logFile = "log/access_log"
    //AppNameの設定コメントで書いているがsubmitコマンド押下時に読み込まれるAppNameはファイル名
    val conf = new SparkConf().setAppName("Example Application")
    //sparkcontextのインスタンス生成
    val sc = new SparkContext(conf)
    //access_logデータを取得と同時にキャッシュに読み込み
    val logData = sc.textFile(logFile, 2).cache()
    //logDataの中からlinkを取得+count
    val index = logData.filter(line => line.contains("index.html")).count()
    val html1 = logData.filter(line => line.contains("1.html")).count()
    val html2 = logData.filter(line => line.contains("2.html")).count()
    val gif1  = logData.filter(line => line.contains("1.gif")).count()
    val gif2  = logData.filter(line => line.contains("2.gif")).count()
    //表示
    println("Lines with index.html: %s".format(index))
    println("Lines with 1.html: %s".format(html1))
    println("Lines with 2.html: %s".format(html2))
    println("Lines with 1.gif:  %s".format(gif1))
    println("Lines with 2.gif:  %s".format(gif2))
  }
}

8.jarファイルの作成 ※1

sbt assembly

9.spark-submitの実行

spark-submit --class ExampleApp target/scala-2.11/scala-assembly-0.1.jar

10.実行結果(結果のみ)

Lines with index.html: 141882
Lines with 1.html: 19996
Lines with 2.html: 18567
Lines with 1.gif:  20996
Lines with 2.gif:  52393

11.etc
※1

Name, Version, ScalaVersion, LibraryDependencies などを

間に空行を入れて記述します。最後にも空行を入れます。

※2

sbtのversionによって記述が必要なようです。
今回のversionではあるとエラー

※3

初期設定時この記述が無く、依存ライブラリが重複しjarファイル作成時にエラーが出ていた