combineByKey関数の詳細



以下にcombineByKeyの定義を示し、その他の詳細は一時的に無視される(1.6.0版の関数名はcombineByKeyWithClassTagに更新された)
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

 
次の3つの重要な関数パラメータを説明します.
  • createCombiner:V=>C、この関数は現在の値をパラメータとしています.この場合、追加の操作(タイプ変換)を行い、初期化操作と同様に戻すことができます.ここでの操作は、各partitionのkeyの最初のデータに対して
  • を操作するだけです.
  • mergeValue:(C,V)=>C、この関数は、前の要素C(createCombiner)に要素Vを結合する(この操作は各パーティション内で行われる)
  • mergeCombiners:(C,C)=>C、この関数は2つの要素Cを結合する(この操作は異なるパーティション間で行われる)
  • 次のようになります.
    val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)
    val rdd2 = rdd1.map((_, 1))
    val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y,
                                (x:Int, y:Int) => x + y)
    rdd2.collect
    rdd3.collect
    

    以上のコードの出力は以下の通りです.
    Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))  Array((4,3), (2,0), (1,-1), (3,0))
    上記のコードでは,(1,1),(2,1),(2,1),(3,1),(3,1),(3,1)が第1 partition,(3,1),(4,1),(4,1),(4,1),(4,1),(4,1),(4,1),(4,1),(4,1)が第2に分割される.次に、次の操作を行います.
    (1,1):1個のみのため、値が負の場合、自然出力(1,-1)(2,1):2個あり、1個目が負であり、2個目が変わらないため、combine後は(2,0)(3,1):partition 1中は3個あり、上記のルールを参照すると、combine後は(3,1)、partition 2中は1個であるため、combine後は(3,-1)となる.2回目のcombineでは初期化操作がないので直接加算し,(3,0)(4,1):プロセスは同じで,結果は(4,3)
    これにより、combineByKey演算子の初期化実行フロー、すなわちpartitionのcombineフェーズのみで有効であり、各keyの最初の要素のみが動作することがわかる. 
    例2:
    平均数を解くためにcombineByKeyを用いた例を以下に示す.
    val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
    val d1 = sc.parallelize(initialScores)
    type MVType = (Int, Double) //        (     ,  )
    d1.combineByKey(
      score => (1, score),
      (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
      (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
    ).map { case (name, (num, socre)) => (name, socre / num) }.collect

    パラメータの意味の解釈a,score=>(1,score)は,スコアをパラメータとして付加的なメタグループタイプを返す.「Fred」列で、現在の点数は88.0=>(1,88.0)1で現在の科目のカウンタを表し、ここでの1,scoreもFredというkeyの最初の要素88.0のみを初期化しているので、次のbでFredというkeyを実現して2番目の点数を加算する場合は、1つ(1,scoreタイプに加算)ではなく1つの後の科目の点数を加算するしかない
    b、(c 1:MVType,newScore)=>(c 1._1+1,c 1._2+newScore)ここでのc 1はcreateCombiner初期化により得られる(1,88.0).あるパーティション内で、私たちはまた「Fred」の新しい点数91.0に出会った.もちろん、前の科目の点数と現在の点数を合わせるとc 1.2+newScore、それから科目計算機を1をプラスしてc 1.1 + 1
    c、(c 1:MVType,c 2:MVType)=>(c 1._1+c 2._1,c 1._2+c 2._2),「Fred」は学霸である可能性があり,選択科目が多すぎて異なるパーティションに分散している可能性があることに注意する.すべてのパーティションをmergeValueにした後、次にパーティション間をマージし、パーティション間科目数と科目数を加算点数と点数を加算して合計点数と合計科目数を得た
    実行結果は次のとおりです.
    res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))