combineByKeyの使用

4708 ワード

 combineByKey   

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
パラメータ:
createCombiner:VタイプをCタイプに変換するコンポジタ関数、入力パラメータRDD[K,V]のV、出力C
mergeValue:値関数をマージし、CタイプとVタイプの値をCタイプにマージします.入力パラメータは(C,V)、出力はCです.
mergeCombiners:結合コンポジタ関数.2つのCタイプの値を1つのCタイプに結合するために使用されます.入力パラメータは(C,C)、出力はCです.
numPartitions:結果RDDパーティション数、デフォルトでは元のパーティション数を保持
partitioner:パーティション関数、デフォルトはHashPartitioner
mapSideCombine:MapReduceのcombineと同様に、Map側でcombine操作を行う必要があるかどうか、デフォルトはtrue
 
例:
もし私たちがたくさんの果物をジュースを搾り、ジュースは純粋で、他の品種の果物があってはいけないと要求したとします.では、いくつかのステップが必要です.
1どのようなジュースが必要かを定義します.
2ジュース搾り機、すなわち果物を定義すると、私たちが定義したジュースを与えることができます.--hadoopのlocal combinerに相当
3ジュースミキサーを定義します.同じ種類の果物ジュースを混ぜることができます.--グローバルcombinerに相当
 
では、上記の3つのステップを比較すると、combineByKeyの3つの関数、つまりこの3つの機能があります.
1 createCombinerは、vをcに変換する方法を定義します.
2 mergeValueは,Vを与えて元のCと新しいCに統合する方法を定義する.
3は、同じkeyの下のCを1つのCに統合する方法を定義することです.
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1.combineByKey((v:Int)=>List(v),--1をlist(1)(c:List[Int],v:Int)=>v::c,--list(1)と2を組み合わせてlist(1,2)(c 1:List[Int],c 2:List[Int])=>c 1::c 2--グローバルに同じkeyのvalueを組み合わせる).collect res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
簡単な紹介
def combineByKey[C](createCombiner: (V) => C,  
                    mergeValue: (C, V) => C,   
                    mergeCombiners: (C, C) => C): RD
  • createCombiner:combineByKey()はパーティション内のすべての要素を巡回するので、各要素のキーはまだ遭遇していないか、前の要素のキーと同じです.これが新しい要素である場合、combineByKey()はcreateCombiner()という関数を使用して、そのキーに対応するアキュムレータの初期値
  • を作成します.
  • mergeValue:現在のパーティションを処理する前にすでに出会ったキーの場合、mergeValue()メソッドを使用して、キーのアキュムレータに対応する現在の値をこの新しい値とマージする
  • .
  • mergeCombiners:各パーティションは独立して処理されるため、同じキーに対して複数のアキュムレータを持つことができる.2つ以上のパーティションに同じキーに対応するアキュムレータがある場合は、ユーザーが提供するmergeCombiners()メソッドを使用して、各パーティションの結果をマージする必要があります.

  • 学生の成績説明のクラスを作成する
    case class ScoreDetail(studentName: String, subject: String, score: Float)

    次のテストデータは、テストデータのセットkey=Students name and value=ScoreDetail instanceをロードします.
        val scores = List(
          ScoreDetail("xiaoming", "Math", 98),
          ScoreDetail("xiaoming", "English", 88),
          ScoreDetail("wangwu", "Math", 75),
          ScoreDetail("wangwu", "English", 78),
          ScoreDetail("lihua", "Math", 90),
          ScoreDetail("lihua", "English", 80),
          ScoreDetail("zhangsan", "Math", 91),
          ScoreDetail("zhangsan", "English", 80))
  • を二元群に変換すると、forとyieldの組合せ
  • を用いてmapに変換することも理解できる.
    val scoresWithKey = for { i 

    集約平均値を求めて印刷させる
          val avgScoresRDD = scoresWithKeyRDD.combineByKey(
          (x: ScoreDetail) => (x.score, 1) /*createCombiner*/,
          (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
          (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
          // calculate the average
        ).map( { case(key, value) => (key, value._1/value._2) })
    
        avgScoresRDD.collect.foreach(println)
    /*  :
    (zhangsan,85.5)
    (lihua,85.0)
    (xiaoming,93.0)
    (wangwu,76.5)
    */

    scoresWithKeyRDDについて説明します.combineByKeycreateCombiner:(x:ScoreDetail)=>(x.score,1)zhangsanに初めて遭遇し、mapのvalueを別のタイプに変換する関数を作成し、ここでは(zhangsan、(ScoreDetailクラス))を(zhangsan,(91,1))mergeValue:(acc:(Float,Int),x:ScoreDetail=>(acc c c c._1+x.score,c.acc_2+1)c.acc_2+1)再び張三に会い、この2つのマージについて、ここでは(zhangsan,(91,1))と(zhangsan,(ScoreDetail類))というタイプをマージし、(zhangsan,(171,2))mergeCombiners(acc 1:(Float,Int),acc 2:(Float,Int))というタイプをマージしました.これは複数のパーティションのzhangsanのデータをマージしたものです.ここではzhansanは同じパーティションにありますが、この場所では役に立ちません
    ------------------------------------------------------------------------ contRdd.combineByKey( (score:(String,Long)) => Map(score._1 -> score._2), (c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)), (c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )