Spark 共有変数


  • Sparkでは、各executorに定数を転送したり、各executorで集計した値をdriverで受け取ったりする機能がある。
  • SparkはScalaの関数(クロージャー)を使って処理を記述するので、関数の外側で定義した変数を関数内で使うことは出来る(ようになっている)が、実行はexecutor(分散した各ノード上)で行われるので、そこで変数に対して行われた変更はdriver側には反映されない。
  • 当然、executor間で参照し合うことも出来ない。 そのため、driverとexecutorとの間で値を共有するための仕組みがSparkには用意されている。

ブロードキャスト変数(broadcast variables)

は、driverで定義した定数(固定値)を各executorに転送する為の変数。

  • SparkはScalaの関数(クロージャー)を使って処理を記述するので、ブロードキャストを使わなくても定数を使用することは出来るのだが。

Scalaの定数を使った例

val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = 123
val filter = rdd.filter(_ != CONSTANT)
filter.foreach(println)

Sparkのブロードキャストを使った例

val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = sc.broadcast(123)
val filter = rdd.filter(_ != CONSTANT.value)
filter.foreach(println)

Scalaの関数で定数を使用している場合、関数を転送する度に値の転送も発生することになる。
ブロードキャストを使うと、定数の内容は1度だけ各executorに転送される。

したがって、ある程度大きなサイズの定数(バイト列とかMapとか)を共有したい場合はブロードキャストを使う方が良い。
逆に小さいサイズの定数なら、ブロードキャストの方がオーバーヘッドが大きい可能性がある。

なお、ブロードキャストで転送する値は、ブロードキャスト生成後に変更してはならない。
例えば、後から新しいノードが追加になった場合に、そのノードに対してブロードキャストを転送することがある為。

アキュムレーター

  • アキュムレーター(accumulator)は、“追加”のみを行う変数。
  • driverでアキュムレーターを生成し、各executorでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。

  • アキュムレーターは、Hadoopのカウンターのようなもの。(Hadoopでは、各タスクでカウンターに値を加算していく)

  • ただし、Hadoopのカウンターの結果はアプリケーション内からは利用できない(Mapタスクで集計したカウンターをReduceタスクで読み込むことは出来ない)が、Sparkのアキュムレーターはアプリケーション内の後続処理に利用することが出来る。

  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val sum = sc.accumulator(0)
  rdd.foreach(sum += _)
  println(sum.value)
  • アキュムレーターはvalueというフィールドを持っており、アキュムレーターの+=メソッドを使うと、valueに対して“追加(加算)”が実行される。

  • アキュムレーターはデフォルトではInt・Long・Float・Doubleでしか使えないが、自分でAccumulatorParamを実装すれば、どんな型でも扱える。

  • (Int・Long・Float・Doubleに関しては、暗黙のAccumulatorParamが用意されている)

import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")(StringAccumulatorParam)
  rdd.foreach(s += _.toString)
  println(s.value)
  • foreachメソッドでは処理順序は保証されないので、上記の結果が「0123」になるとは限らない。「0312」とかにもなりうる。

  • AccumulatorParamのオブジェクトをimplicit objectにしておけば、accumulatorメソッドでAccumulatorParamを指定する必要が無くなる。

implicit object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")
  • Accumulableはアキュムレーターの汎用版で、保持する値の型と追加する値の型が異なっていてもよい。(Accumulator[T]はAccumulable[T,T]である)
import org.apache.spark.AccumulableParam
  • Seq[String]の例
object SeqAccumulableParam extends AccumulableParam[Seq[String], String] {

  def zero(initialValue: Seq[String]): Seq[String] = Seq.empty

  def addInPlace(r1: Seq[String], r2: Seq[String]): Seq[String] = r1 ++ r2

  def addAccumulator(r: Seq[String], t: String): Seq[String] = r :+ t
}
  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val s = sc.accumulable(Seq.empty[String])(SeqAccumulableParam)
  rdd.foreach(s += _)
  println(s.value) //→List(b, c, a)
  • Map[K,V]の例
class MapAccumulableParam[K, V] extends AccumulableParam[Map[K, V], (K, V)] {

  def zero(initialValue: Map[K, V]): Map[K, V] = Map.empty

  def addInPlace(r1: Map[K, V], r2: Map[K, V]): Map[K, V] = r1 ++ r2

  def addAccumulator(r: Map[K, V], t: (K, V)): Map[K, V] = r + t
}
  val rdd = sc.makeRDD(Seq("a", "bc", "def"))
  val m = sc.accumulable(Map.empty[String, Int])(new MapAccumulableParam[String, Int])
  rdd.foreach(key => m += (key, key.length))
  println(m.value) //→Map(a -> 1, bc -> 2, def -> 3)
  • Mapでカウントする例
object MapCounterAccumulableParam extends AccumulableParam[scala.collection.mutable.Map[String, Int], String] {

  def zero(initialValue: scala.collection.mutable.Map[String, Int]) = scala.collection.mutable.Map.empty[String, Int]

  def addInPlace(r1: scala.collection.mutable.Map[String, Int], r2: scala.collection.mutable.Map[String, Int]): scala.collection.mutable.Map[String, Int] = {
    r2.foreach{ kv => add(r1, kv._1, kv._2) }
    r1
  }

  def addAccumulator(r: scala.collection.mutable.Map[String, Int], t: String): scala.collection.mutable.Map[String, Int] = {
    add(r, t, 1)
    r
  }

  private def add(r: scala.collection.mutable.Map[String, Int], key: String, value: Int): Unit = {
    r.put(key, r.getOrElse(key, 0) + value)
  }
}
  val rdd = sc.makeRDD(Seq("a", "b", "c", "a", "b", "a"))
  val m = sc.accumulable(scala.collection.mutable.Map.empty[String, Int])(MapCounterAccumulableParam)
  rdd.foreach(m += _)
  println(m.value) //→Map(b -> 2, a -> 3, c -> 1)
  • ArrayBuffer[String]の例 Growableトレイトをミックスインしているコレクション(mutableのArrayBufferやListBuffer)を使う場合はaccumulableCollectionメソッドが便利。
  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String])
  rdd.foreach(ac += _)
  println(ac.value) //→ArrayBuffer(a, b, c)