Spark on EMRでlog出力する


概要

Sparkは複数サーバで分散処理する影響でログ設定もちょっと面倒だったので、まとめておきます。

前提

他のシステムがlogbackで実装されてるとしても、Sparkアプリではlog4jでログ出力するのがトラブルが少なそうです。

簡単に調べてみると、Spark(というかHadoop, EMR)は、log4jでのログ出力を前提としているようでした。
Sparkもコンパイルの依存にlog4jなどが入っているし、EMRでも実行時のクラスパスにslf4j-log4j12などがついてきます。

頑張って一つずつ依存を取り除いていけば解消出来るのかもしれませんが、僕はEMRの中をいじるのはオススメしません。きっと嵌まります。

log4j.propertiesの読み込ませ方

Sparkは複数マシンのJVMで、Driver/Executorが動くため、
それぞれに設定を行う必要があります。

静的に読む方法

以下をspark-submit時に指定してあげます。
見てわかると思いますが、上がDriver用で下がexecutor用です。
(なお、ここで設定する時にjarの中のリソースを取得出来ない気がしたので、後述の動的な読み込みを推奨しておきます。
上手く行った例があったら教えて下さい。)

--driver-java-options=-Dlog4j.configuration=file:///home/hadoop/log4j.properties
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///home/hadoop/log4j.properties

動的に読む方法

Driver/Executorそれぞれで、loggerを生成する前に以下を実行します。

    val props = new Properties()
    props.load(getClass.getClassLoader.getResourceAsStream("myLog4j.properties"))
    PropertyConfigurator.configure(props)

ここで注意することは、

  • log4j.properties を指定すると、EMRでクラスパスに入ってる何かが邪魔してくるっぽくて上手くいかなかったので、別の名前を使うこと
  • このコードを呼んだ後にloggerを生成すること
  • DriverだけでなくExecutorでもこのコードを呼ぶこと

です。

ちなみに、それらを踏まえたサンプルコードはこんなかんじです。

Main.scala
import java.util.Properties

import org.apache.log4j.PropertyConfigurator
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object Main {

  def main(args: Array[String]): Unit = {

    val props = new Properties()
    props.load(getClass.getClassLoader.getResourceAsStream("myLog4j.properties"))
    PropertyConfigurator.configure(props)

    val driverLogger = LoggerFactory.getLogger(Main.getClass)

    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val rdd = sc.range(1, 100, 1, 10)

    driverLogger.info("----Start info----")

    lazy val logic = new ExecutorLogic()
    rdd.map(i => i*2)
      .foreach(i => logic.log.info(i.toString))
  }
}

ExecutorLogic.scala
import java.util.Properties

import org.apache.log4j.PropertyConfigurator
import org.slf4j.LoggerFactory

class ExecutorLogic {

  val props = new Properties()
  props.load(getClass.getClassLoader.getResourceAsStream("myLog4j.properties"))
  PropertyConfigurator.configure(props)

  lazy val log = LoggerFactory.getLogger(classOf[ExecutorLogic])

}

また、この時に

  • Dependenciesに slf4j-api をprovidedで入れておく
  • src/main/resources以下に myLog4j.properties を配置しておく

をしてあげる必要があります。

あとは、YARNやSpark UIから見て、ちゃんとlogger設定が反映されていればOKです。