combineByKeyの使用
4708 ワード
combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
パラメータ:
createCombiner:VタイプをCタイプに変換するコンポジタ関数、入力パラメータRDD[K,V]のV、出力C
mergeValue:値関数をマージし、CタイプとVタイプの値をCタイプにマージします.入力パラメータは(C,V)、出力はCです.
mergeCombiners:結合コンポジタ関数.2つのCタイプの値を1つのCタイプに結合するために使用されます.入力パラメータは(C,C)、出力はCです.
numPartitions:結果RDDパーティション数、デフォルトでは元のパーティション数を保持
partitioner:パーティション関数、デフォルトはHashPartitioner
mapSideCombine:MapReduceのcombineと同様に、Map側でcombine操作を行う必要があるかどうか、デフォルトはtrue
例:
もし私たちがたくさんの果物をジュースを搾り、ジュースは純粋で、他の品種の果物があってはいけないと要求したとします.では、いくつかのステップが必要です.
1どのようなジュースが必要かを定義します.
2ジュース搾り機、すなわち果物を定義すると、私たちが定義したジュースを与えることができます.--hadoopのlocal combinerに相当
3ジュースミキサーを定義します.同じ種類の果物ジュースを混ぜることができます.--グローバルcombinerに相当
では、上記の3つのステップを比較すると、combineByKeyの3つの関数、つまりこの3つの機能があります.
1 createCombinerは、vをcに変換する方法を定義します.
2 mergeValueは,Vを与えて元のCと新しいCに統合する方法を定義する.
3は、同じkeyの下のCを1つのCに統合する方法を定義することです.
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1.combineByKey((v:Int)=>List(v),--1をlist(1)(c:List[Int],v:Int)=>v::c,--list(1)と2を組み合わせてlist(1,2)(c 1:List[Int],c 2:List[Int])=>c 1::c 2--グローバルに同じkeyのvalueを組み合わせる).collect res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
簡単な紹介
def combineByKey[C](createCombiner: (V) => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RD
学生の成績説明のクラスを作成する
case class ScoreDetail(studentName: String, subject: String, score: Float)
次のテストデータは、テストデータのセットkey=Students name and value=ScoreDetail instanceをロードします.
val scores = List(
ScoreDetail("xiaoming", "Math", 98),
ScoreDetail("xiaoming", "English", 88),
ScoreDetail("wangwu", "Math", 75),
ScoreDetail("wangwu", "English", 78),
ScoreDetail("lihua", "Math", 90),
ScoreDetail("lihua", "English", 80),
ScoreDetail("zhangsan", "Math", 91),
ScoreDetail("zhangsan", "English", 80))
val scoresWithKey = for { i
集約平均値を求めて印刷させる
val avgScoresRDD = scoresWithKeyRDD.combineByKey(
(x: ScoreDetail) => (x.score, 1) /*createCombiner*/,
(acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
(acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
// calculate the average
).map( { case(key, value) => (key, value._1/value._2) })
avgScoresRDD.collect.foreach(println)
/* :
(zhangsan,85.5)
(lihua,85.0)
(xiaoming,93.0)
(wangwu,76.5)
*/
scoresWithKeyRDDについて説明します.combineByKeycreateCombiner:(x:ScoreDetail)=>(x.score,1)zhangsanに初めて遭遇し、mapのvalueを別のタイプに変換する関数を作成し、ここでは(zhangsan、(ScoreDetailクラス))を(zhangsan,(91,1))mergeValue:(acc:(Float,Int),x:ScoreDetail=>(acc c c c._1+x.score,c.acc_2+1)c.acc_2+1)再び張三に会い、この2つのマージについて、ここでは(zhangsan,(91,1))と(zhangsan,(ScoreDetail類))というタイプをマージし、(zhangsan,(171,2))mergeCombiners(acc 1:(Float,Int),acc 2:(Float,Int))というタイプをマージしました.これは複数のパーティションのzhangsanのデータをマージしたものです.ここではzhansanは同じパーティションにありますが、この場所では役に立ちません
------------------------------------------------------------------------
contRdd.combineByKey(
(score:(String,Long)) => Map(score._1 -> score._2),
(c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)),
(c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )