AWS EMRにspark-jobserverを構築してREST経由でJob実行させる


Sparkを利用して、ジョブをパシパシ叩きたかったのですが、どうやってジョブ実行をしようかな、と悩むことになりました。

クラウド依存になりたくなかったので、できればKinesisストリームは使いたくないな。。と思いつつも下記のように調べてみました。

利用可否 Streamingか単発ジョブか アドホックな引数設定の可否 懸念
KinesisとSpark Streaming Streaming やや難しい。 GCPとかMS使いたくなったら書き直しが多い
spark-jobserver 単発ジョブ RestAPIのパラメータ設定なので簡単 今後も利用され続けるか疑問。
luigiとか使ってspark-submitを叩く 単発ジョブ ジョブ作りこめば簡単 Sparkのためだけのジョブサーバを構築・運用するかどうするか
Akka Stream 不可 implementされてない(2016/4/21時点) おそらくStreaming おそらく難しい。 いつ出るんだろ。。

どれもあまり乗り気になれませんでしたが、RestAPIで叩けるようにしておけば、開発関係者が気軽にバッチを叩いてMLを試したりできるかも?ということで、spark-jobserverを使ってみることにします。

構築について

sparkの利用できるEMRの起動後に、sshでmasterノードにログインします。

sshログインしたら、下記のMarkdownにしたがって、job-server.tar.gzを作成します。

m1.mediumのインスタンスを利用しましたがビルドは遅かったです。。
assemblyでjob-server.tar.gzを作成するだけなので、CI系サービスか、自前の環境で作成してartifactoryやs3等に保存してもいいかもしれません。(これだけのために毎度EMRを立ち上げる必要はなし。)

利用してみる

job-server.tar.gzを展開します。

mkdir /mnt/lib/spark-jobserver
cd /mnt/lib/spark-jobserver
tar zxf job-server.tar.gz

JAVA_OPTSのserver_start.shの中のjmx周りの設定をコメントアウトします。
これがあると動かないことがありました。

テスト用のアプリケーションを用意してみます。

build.sbt
name := "example-spark-jobs"

organization := "org.triplew.dfree"

version := "1.0-SNAPSHOT"

enablePlugins(JavaAppPackaging, sbtdocker.DockerPlugin)

scalaVersion := "2.10.6"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

libraryDependencies ++= {
  val sparkV      = "1.6.1"
  Seq(
    "org.apache.spark" % "spark-core_2.10" % sparkV,
    "spark.jobserver" %% "job-server-api" % "0.6.1" % "provided",
    "com.github.seratch" %% "awscala" % "0.5.+",
    "com.github.levkhomich" %% "akka-tracing-core" % "0.4",
    "commons-configuration" % "commons-configuration" % "1.10"
  )
}

dependencyOverrides ++= Set(
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)

Revolver.settings

resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"

以下割愛

scalaのバージョンを2.11にすると、sparkジョブは動きませんでした。
これ、大丈夫だろうか、という不安を覚えますね。。

setMasterで指定しているmasterについては、spark-shellでいったん、sc.masterで確認しちゃいました。こういうのは外だししたいですね。。

package org.triplew.dfree.example.spark.jobs

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import spark.jobserver.{SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}

import scala.util.Try

object Hoge extends SparkJob {
  override def runJob(sc:SparkContext, jobConfig: Config): Any = {
    sc.parallelize(jobConfig.getString("input.string").split("-").toSeq).count()
  }

  override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
    Try(config.getString("input.string"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No input.string config param"))
  }

  def main(args: Array[String]): Unit = {
    val appName: String = "dfree-example-spark-jobs"
    val conf = new SparkConf().setAppName(appName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    val config = ConfigFactory.parseString("")
    val result = runJob(sc, config)

    println(result)
  }
}

このファイルを、sbt pacakgeでコンパイルして生成されたjarを、curlコマンドで、jobserverにputします。

また、jarに対しては名前を与えることができるようです。

curl --data-binary @target/scala-2.10/example-spark-jobs_2.10-1.0-SNAPSHOT.jar http://*.*.*.*:8090/jars/my-test

次にcontextを用意します。

curl -d "" 'http://*.*.*.*:8090/contexts/test?num-cpu-cores=1&memory-per-node=512m&spark.executor.instances=1

上記で作成したcontextとjarを指定して、sparkジョブを実行します。

curl -d "input.string = a-b-c" '*.*.*.*:8090/jobs?appName=my-test&classPath=org.triplew.dfree.example.spark.jobs.Hoge&context=test&sync=true'

そしてアウトプットとして以下を取得します。

{
  "result": 3
}

動いてるようです。

所感

  • Restのパラメータに指定するsyncで、非同期リクエストも可能のようなので、重たいバッチや機械学習をasync実行させ、jobidのみを控えておいて、ステータス繰り上がりあとに利用する、みたいな感じでしょうか。
  • 全部そこらへんもScalaでやるならFutureなどを利用するのもいいかもしれません。
  • Akka Streaming早く使ってみたい。。

本日は以上となります。