AWS SDK for Javaで、Scala 2.11ビルドのSparkアプリを実行するStepを追加する


現時点(2016/05/06)だと、Spark on EMRに入ってるSpark assemblyはScala 2.10でビルドされているため、Scala 2.11で書いたアプリをEMRで実行する際はひと工夫必要です。

こちらの記事にて、masterノードにsshで入ってspark-submitする方法が紹介されていますが、ここではAWS SDK for Javaでstepを追加する方法をメモしときます。

まあほぼAWSのドキュメントに書いてあるままです。

準備

上記記事を参考に、

  • bootstrapアクションでJava8をインストールしたクラスタの用意
  • Scala 2.11でビルドしたSpark assemblyの用意(s3://path/to/spark-assemblyに置いたものとします)
  • 適当にSparkアプリ作ってfatJarにしておく(s3://path/to/your-appに置いたものとします)

をやっときます。

Stepを追加するコード

個人的趣味で、Scalaで書きます。

Main.scala
package com.example

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.{Regions, RegionUtils}
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
import com.amazonaws.services.elasticmapreduce.model._

object Main {
  def main(args: Array[String]): Unit = {
    val credentials = new BasicAWSCredentials("your access key", "your access secret")
    val emrClient = new AmazonElasticMapReduceClient(credentials)
    emrClient.setRegion(RegionUtils.getRegion(Regions.AP_NORTHEAST_1.getName))

    val hadoopStepConfig = new HadoopJarStepConfig()
      .withJar("command-runner.jar")
      .withArgs("spark-submit",
        "--master", "yarn-cluster",
        "--conf", "spark.yarn.jar=s3://path/to/spark-assembly",
        "--class", "com.example.Main",
        "s3://path/to/your-app")

    val stepConfig = new StepConfig()
      .withName("step-example")
      .withActionOnFailure(ActionOnFailure.CONTINUE)
      .withHadoopJarStep(hadoopStepConfig)

    val jobFlowRequest = new AddJobFlowStepsRequest()
      .withSteps(stepConfig)
      .withJobFlowId("your cluster id")

    val jobFlowResult = emrClient.addJobFlowSteps(jobFlowRequest)
    println(s"stepIds: ${jobFlowResult.getStepIds}")
  }
}

実行すると、ちゃんとStepが作成されました。

$ sbt run
stepIds: [s-xxxxxxxxxx]