AWS EMRにspark-jobserverを構築してREST経由でJob実行させる
Sparkを利用して、ジョブをパシパシ叩きたかったのですが、どうやってジョブ実行をしようかな、と悩むことになりました。
クラウド依存になりたくなかったので、できればKinesisストリームは使いたくないな。。と思いつつも下記のように調べてみました。
利用可否 | Streamingか単発ジョブか | アドホックな引数設定の可否 | 懸念 | |
---|---|---|---|---|
KinesisとSpark Streaming | 可 | Streaming | やや難しい。 | GCPとかMS使いたくなったら書き直しが多い |
spark-jobserver | 可 | 単発ジョブ | RestAPIのパラメータ設定なので簡単 | 今後も利用され続けるか疑問。 |
luigiとか使ってspark-submitを叩く | 可 | 単発ジョブ | ジョブ作りこめば簡単 | Sparkのためだけのジョブサーバを構築・運用するかどうするか |
Akka Stream | 不可 implementされてない(2016/4/21時点) | おそらくStreaming | おそらく難しい。 | いつ出るんだろ。。 |
どれもあまり乗り気になれませんでしたが、RestAPIで叩けるようにしておけば、開発関係者が気軽にバッチを叩いてMLを試したりできるかも?ということで、spark-jobserverを使ってみることにします。
構築について
sparkの利用できるEMRの起動後に、sshでmasterノードにログインします。
sshログインしたら、下記のMarkdownにしたがって、job-server.tar.gzを作成します。
m1.mediumのインスタンスを利用しましたがビルドは遅かったです。。
assemblyでjob-server.tar.gzを作成するだけなので、CI系サービスか、自前の環境で作成してartifactoryやs3等に保存してもいいかもしれません。(これだけのために毎度EMRを立ち上げる必要はなし。)
利用してみる
job-server.tar.gzを展開します。
mkdir /mnt/lib/spark-jobserver
cd /mnt/lib/spark-jobserver
tar zxf job-server.tar.gz
JAVA_OPTSのserver_start.sh
の中のjmx周りの設定をコメントアウトします。
これがあると動かないことがありました。
テスト用のアプリケーションを用意してみます。
name := "example-spark-jobs"
organization := "org.triplew.dfree"
version := "1.0-SNAPSHOT"
enablePlugins(JavaAppPackaging, sbtdocker.DockerPlugin)
scalaVersion := "2.10.6"
scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")
libraryDependencies ++= {
val sparkV = "1.6.1"
Seq(
"org.apache.spark" % "spark-core_2.10" % sparkV,
"spark.jobserver" %% "job-server-api" % "0.6.1" % "provided",
"com.github.seratch" %% "awscala" % "0.5.+",
"com.github.levkhomich" %% "akka-tracing-core" % "0.4",
"commons-configuration" % "commons-configuration" % "1.10"
)
}
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)
Revolver.settings
resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"
(以下割愛)
scalaのバージョンを2.11にすると、sparkジョブは動きませんでした。
これ、大丈夫だろうか、という不安を覚えますね。。
setMasterで指定しているmasterについては、spark-shellでいったん、sc.master
で確認しちゃいました。こういうのは外だししたいですね。。
package org.triplew.dfree.example.spark.jobs
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import spark.jobserver.{SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}
import scala.util.Try
object Hoge extends SparkJob {
override def runJob(sc:SparkContext, jobConfig: Config): Any = {
sc.parallelize(jobConfig.getString("input.string").split("-").toSeq).count()
}
override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
def main(args: Array[String]): Unit = {
val appName: String = "dfree-example-spark-jobs"
val conf = new SparkConf().setAppName(appName).setMaster("local[*]")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val result = runJob(sc, config)
println(result)
}
}
このファイルを、sbt pacakgeでコンパイルして生成されたjarを、curlコマンドで、jobserverにputします。
また、jarに対しては名前を与えることができるようです。
curl --data-binary @target/scala-2.10/example-spark-jobs_2.10-1.0-SNAPSHOT.jar http://*.*.*.*:8090/jars/my-test
次にcontextを用意します。
curl -d "" 'http://*.*.*.*:8090/contexts/test?num-cpu-cores=1&memory-per-node=512m&spark.executor.instances=1
上記で作成したcontextとjarを指定して、sparkジョブを実行します。
curl -d "input.string = a-b-c" '*.*.*.*:8090/jobs?appName=my-test&classPath=org.triplew.dfree.example.spark.jobs.Hoge&context=test&sync=true'
そしてアウトプットとして以下を取得します。
{
"result": 3
}
動いてるようです。
所感
- Restのパラメータに指定するsyncで、非同期リクエストも可能のようなので、重たいバッチや機械学習をasync実行させ、jobidのみを控えておいて、ステータス繰り上がりあとに利用する、みたいな感じでしょうか。
- 全部そこらへんもScalaでやるならFutureなどを利用するのもいいかもしれません。
- Akka Streaming早く使ってみたい。。
本日は以上となります。
Author And Source
この問題について(AWS EMRにspark-jobserverを構築してREST経由でJob実行させる), 我々は、より多くの情報をここで見つけました https://qiita.com/letusfly85/items/9fb90d5166116241c431著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .