SparkコアのcombineByKey詳細

2324 ワード

ここでcombineByKeyを単独で説明するのは、この演算子を練習しているのが最初はよく分からなかったので、私の実験過程が他の私と同じ疑問を持っている人に役立つことを望んでいます.まずcombineByKeyの定義を与え,他の詳細は一時的に無視する.
 def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
                /*content*/
  }

まず、上記の3つのパラメータを紹介します.
* Users provide three functions:
*
*  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
             ,              (    )      (           )
*  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
      V        C(createCombiner)  (            )
*  - `mergeCombiners`, to combine two C's into a single one.
    2   C   (            )


次に、具体的な例を示します.
  var rdd = sc.makeRDD(Array(("A",2),("A",1),("A",3),("B",1),("B",2),("C",1)))

    val collect: Array[(String, String)] = rdd.combineByKey(
      (v: Int) => v + "_",
      (c: String, v: Int) => c + "@" + v,//     
      (c1: String, c2: String) => c1 + "$" + c2
    ).collect

    for (elem 

結果:
(A,--,2_$1_@3) (B,--,1_$2_) (C,--,1_)
解析:
各パーティション内の同じKeyのデータについては、createCombinerを使用してCを初期化し、mergeValueを使用して初期化後のCを同じパーティションの他のvalueとマージして新しいCを形成します.最後にmergeCombinersを使用して、異なるパーティション間のCを組み合わせます.
 
結果をより容易に見るためにrddのパーティションを印刷観察した.
 val tuples: Array[(String, List[(String, Int)])] = rdd.mapPartitionsWithIndex((Index, iter) => {
      var part_map = scala.collection.mutable.Map[String, List[(String, Int)]]()

      while (iter.hasNext) {
        var part_name = "part_" + Index
        var elem = iter.next();
        if (part_map.contains(part_name)) {
          var elems = part_map(part_name)
          elems ::= elem
          part_map(part_name) = elems
        } else {
          part_map(part_name) = List[(String, Int)] {
            elem
          }
        }
      }
      part_map.iterator
    }).collect()
    for (elem ",elem._2)
    }

結果:
(part_0,-->,List((A,2))) (part_1,-->,List((A,3), (A,1))) (part_2,-->,List((B,1))) (part_3,-->,List((C,1), (B,2)))
Aには同じパーティションと異なるパーティションの2種類があるため、マージ後の値は(A,--,2_$1_@3)