Spark on YARNのエラー対応メモ


概要

Sparkを始めとする分散処理では、普通のアプリケーション以上にエラー対応について考えることがあります。

  • 特定のノードの問題なのではないか?
  • ノード間・外部リソースの問題ではないか?
  • どこから処理をやり直すべきか?
  • もう一度処理を繰り返してもいいのか?
  • 並列で動いているタスクはどうするのか?

などなど考えることがあります。

前提知識

Sparkアプリのフェーズ

Sparkには以下のフェーズがあります。

  • App

    • 一番大きな単位。
    • Sparkアプリが例外などで終了すると失敗となる。
    • 失敗すると最初からやりなおす。
    • デフォルトでは1回だけやりなおす。
  • Job

    • SparkContext内で実行される処理(collect, reduce, saveAsTextfile)の単位
    • 内部では複数のTaskが実行される。
    • デフォルトではtaskが3回失敗するとそのJobは失敗となる。
    • spark.task.maxFailures で調整
    • 失敗しても他のJobには影響しない。
  • Task

    • パーティション単位で実行される。
    • 失敗したらそのタスクと同じものが再試行される。

RDDの冗長性

RDDの実体は、各レコードとそのレコードの生成方法を記したDAGで構成されます。
レコードの方を永続化していなければ、そのレコードが使われたり消滅した場合は、DAGをたどって再計算されます。

また、RDDのレコードはデフォルトで複数マシンに分散して冗長性を担保してくれるようです。

個人的ポリシー

  • 処理は冪等にする。
    • どんなにしっかり例外処理を施しても、アプリ外の要因で再計算されるため。
  • 何度も使うRDDは永続化する。
  • リトライはしてもいいがたぶんあまり必要ない。
    • EMR上でクラスタを組んだ場合
    • 不安定な外部リソースに依存しない場合
  • EMR内のクラスタに状態をなるべく持たせない。

強制終了の方法

ある処理で失敗した場合にアプリ全体を止めたい場合は sc.cancelAllJobs() を呼び出してあげればOKです。