YARN上のSparkStreamingジョブを安全に停止する


目的

SparkStreamingはそれ自身では停止機能を持っていない。(常時稼働することを想定しているため)
YARN上で動かしている場合、停止リクエストを投げると止まるが、その際に停止タイミングが制御できない。(処理中に急に止まる場合もありうる)

sparkStreamingを扱う場合、トランザクション系の処理を持たないDBを使うケースが多いので、データ不整合などが容易に起こりうる。(その程度のデータ不整合を気にするようなドメインを扱うなという話もあるかもしれない。)

そこでstreamingにgraceful stopを導入する。

やり方。

StreamingContext.stop(...)を呼びだせばいい。

in Java
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext

in Scala
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext

引数は両方trueが普通だと思います。(というか、どちらかをわざわざfalseにする場面が想定できない。)

on YARN

さて、ではYARN上で動いているアプリケーションでどうやってこれを呼ばせるか。
外からpush型で情報を受け渡すのは難しそうなので、Sparkアプリ側で対応するのがいいと思います。
方法は2つ。

  • 別スレッドを立てて、定期的にあるデータを見に行き、条件を満たしていれば発火
  • shutdownhookを仕込むことで、YARNへ停止命令が送られてきた時に発火
//Java
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(jobManager));
//Scala
sys.ShutdownHookThread.

参考