Spark2.0をEMRで試す


概要

一週間前くらいにSpark2.0が出ました。そして、今日確認したらEMR5.0が既にSpark2.0対応されていました。さすがAWSさんです!

ということで、Spark2.0でイマドキのSpark実行を簡単にメモしておきます。

ゴール

  • Spark2.0が動く
  • Scala2.11ビルドしたjarが動く
  • Java8で動く
  • YARN分散環境上で動く

EMR設定

Software Configurationにて以下のように設定

  • emr-5.0.0
  • hadoop2.7.2
  • Spark2.0.0
  • configurationに以下のjsonを追加
    • Java8の設定と、sparkに最適化する設定を行っています。
[
  {
    "classification": "spark",
    "properties": {
      "maximizeResourceAllocation": "true"
    }
  },
  {
    "Classification": "hadoop-env",
    "Configurations": [
      {
        "Classification": "export",
        "Configurations": [],
        "Properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
        }
      }
    ],
    "Properties": {}
  },
  {
    "Classification": "spark-env",
    "Configurations": [
      {
        "Classification": "export",
        "Configurations": [],
        "Properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
        }
      }
    ],
    "Properties": {}
  }
]

Sparkアプリ

build.sbt
//nameとかは省略
//sbt-assemblyが入ってる前提

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided"
)

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
Main.scala
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val rdd = sc.range(1, 100000, 1, 10)
    println("----Start----")

    //check for using scala-library_2.11
    //if using 2.10, this method cause Exception.
    println("hello" -> "world")

    rdd.map(i => i*2)
      .foreach(i => println(i))
  }
}

こんな感じで書いて、sbt assemblyすればOK

EMRからアプリを実行

assemblyしたjarをS3にアップロードします。

後はEMRクラスタを構築して、起動オプションを以下のように設定すればOK

# この2つは、MasterNode上で任意のshellコマンドを叩くことを示しています。
JAR location: command-runner.jar
Main class: None

# spark-submitコマンド
Arguments: spark-submit --deploy-mode cluster 
--class com.github.uryyyyyyy.Main 
--master yarn 
--num-executors 2 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 1 
s3://<your-s3-bucket>/jars/<your-assembly-0.1.0.jar> <起動オプション>

# 実行中に失敗したらクラスタをどうするか
Action on failure: Continue

こんな感じで実行できます。

もしくは、Master Nodeにsshで入ってspark-submit ~~~ を実行しても動きます。

まとめ

これだけで上記のゴールが達成できます。
Spark2系はデフォルトでscala2.11ビルドなのが嬉しいですね。

It also updates Spark (an engine for large-scale data processing) from 1.6.2 to 2.0, with a similar move to Scala 2.11.