Spark学習05——キー値対RDD変換演算子

50996 ワード

すべてのキー値対RDD変換演算子は以下の通りである.
mapValues、flatMapValues、sortByKey、combineByKey、foldByKey、groupByKey、reduceByKey、aggregateByKey、cogroup、join、leftOuterJoin、rightOuterJoin
もちろん、キー値はRDDに対してすべてのRDD変換演算子を使用することができ、詳細は以下を参照してください.https://blog.csdn.net/qq_23146763/article/details/100988127
具体的な解釈と例
1. mapValues
キーを変更せずにpairRDDの各値に対してmapを呼び出す
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val list = List(("zhangsan", 22), ("lisi", 20), ("wangwu", 23))
    val rdd = sc.parallelize(list)
    val mapValuesRDD = rdd.mapValues(_ + 2)
    mapValuesRDD.foreach(println)

    sc.stop()

2. flatMapValues
キーを変更せずにpairRDDの各値に対してflatMapを呼び出す
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val list = List(("zhangsan", "GD SZ"), ("lisi", "HN YY"), ("wangwu", "JS NJ"))
    val rdd = sc.parallelize(list)
    val mapValuesRDD = rdd.flatMapValues(v => v.split(" "))
    mapValuesRDD.foreach(println)

    sc.stop()

3. sortByKey
1.keyがソートを実現すると、Keyでソートされた(K,V)キー値対からなるRDDを返し、accendingがtrueの場合は昇順、falseの場合は降順、numPartitionsはパーティション数を設定、ジョブ並列度を高める.並列度を1に設定すると、より良いソートが可能になります.
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val arr = List((1, 20), (1, 10), (2, 20), (2, 10), (3, 20), (3, 10))
    val rdd = sc.parallelize(arr)
    val sortByKeyRDD = rdd.sortByKey(true, 1)
    sortByKeyRDD.foreach(println)

    sc.stop()

4. combineByKey
異なる戻りタイプを使用して、同じキーを持つ値comineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)をマージします.
createCombiner:Keysに初めて遭遇したときに結合器関数を作成し、RDDデータセットのVタイプ値をCタイプ値(V=>C)に変換します.Cは集合タイプです.注意:このプロセスは各パーティションで初めて各キーが現れるときに発生します.
mergeValue:連結値関数(パーティションごとに独立処理)、再び同じKeyに遭遇した場合、createCombiner道理のCタイプ値と今回入力したVタイプ値を1つのCタイプ値(C,V)=>C mergeCombiners:連結コンポジタ関数(各キーのパーティション結果を連結)に連結し、Cタイプ値の2つを1つにまとめるCタイプ値partioner:既存またはカスタムパーティション関数を使用し、デフォルトはHashPartitioner mapSideCombine:map側でCombine操作を行うかどうか、デフォルトはtrue
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //    
    val input = sc.parallelize(List(("coffee", 1), ("coffee", 2), ("panda", 4), ("panda", 5)))
    val result = input.combineByKey(
      (v) => (v),
      (acc: (Int), v) => (acc + v),
      (acc1: (Int), acc2: (Int)) => (acc1 + acc2)
    )
    result.foreach(println)

    //    
    val result2 = input.combineByKey(
      (v) => (v, 1),
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).map { case (key, value) => (key, value._1 / value._2.toFloat) }
    result2.collectAsMap().map(println)

    sc.stop()

5. foldByKey
foldByKey,groupByKey,reduceByKey関数は最終的にcombineByKey関数を呼び出すことによって実現される役割である:vauleごとにzeroValueを加え、keyでzeroValeを集約する:Vを初期化し、実際にはCombineByKeyのcreateCombinerによって実現されるV=>(zeroValue,V)を集約し、func関数によって新しい値、すなわちfunc(zeroValue,V)にマッピングし、例4のように、各V先進行V=>2+V func:Valueはfunc関数によってKey値で連結される(実際にはCombineByKeyのmergeValue,mergeCombiners関数によって実現されるが、ここでは、この2つの関数は同じである)
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3))
    val rdd = sc.parallelize(people)
    val foldByKeyRDD = rdd.foldByKey(2)((_ + _))
    foldByKeyRDD.foreach(println)

    sc.stop()

6. groupByKey
キーでグループ化し、[K,Iterable[V],numPartitionsでパーティション数を設定し、ジョブ並列度を上げる
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val seq = Seq(("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 2))
    val rdd = sc.parallelize(seq)
    val groupRDD = rdd.groupByKey(3)
    groupRDD.foreach(println)
    sc.stop()

7. reduceByKey
Keyでグループ化し、与えられたfunc関数を使用してvalue値を集約し、numPartitionsでパーティション数を設定し、ジョブ並列度を向上
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val seq = Seq(("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 2))
    val rdd = sc.parallelize(seq)
    val reduceRdd = rdd.reduceByKey((v1, v2) => v1 + v2, 3)
    reduceRdd.foreach(println)
    sc.stop()

8. aggregateByKey
1.概念はPairRDD中の同じKey値を集約し、集約中にも中立的な初期値を使用する.
2.aggregateとの相違点1.Aggregateの集約結果はパーティション数と関係がある.AggregateByKeyの集約結果はパーティション数とは無関係である.Aggregateは各パーティション内の要素を集約する.AggregateByKeyはPairRDDの同じKey値を集約し、PairRDDに戻る
3.例の説明1.keyでグループ化し、seqで各グループvalueと初期値を比較し、小さな値を新しいキー値ペアとして得、combで各グループデータvalueをkeyで集約し、最終的に各グループにPairRDDを生成する
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.parallelize(Seq((1, 2), (1, 3), (1, 4), (2, 5)))

    def seq(a: Int, b: Int): Int = {
      println("seq: " + a + "\t " + b)
      math.min(a, b)
    }

    def comb(a: Int, b: Int): Int = {
      println("comb: " + a + "\t " + b)
      a + b
    }

    rdd.aggregateByKey(3, 2)(seq, comb).foreach(println)

    sc.stop()

9. cogroup
2つのRDD(例えば、(K,V)と(K,W))の同じKeyの要素をそれぞれ集約し、最後に(K,Iterator,Iterator)形式のRDD,numPartitionsを返してパーティション数を設定し、ジョブ並列度を高める
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val arr2 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
    val rdd1 = sc.parallelize(arr1, 3)
    val rdd2 = sc.parallelize(arr2, 3)
    val groupByKeyRDD = rdd1.cogroup(rdd2)
    groupByKeyRDD.foreach(println)

    sc.stop()

10. join
内部接続両方のpairRDDに存在するキーのみが出力されます.1つの出力に対応するキーに複数の値がある場合、生成されたpairRDDは、2つの入力RDDからの各セットに対応するvalueを含む
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val arr2 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
    val rdd1 = sc.parallelize(arr1, 3)
    val rdd2 = sc.parallelize(arr2, 3)
    val groupByKeyRDD = rdd1.join(rdd2)
    groupByKeyRDD.foreach(println)

    sc.stop()

11. leftOuterJoin
左外部ジョイン.ソースpairRDDは各キーに対応するレコードがあり、2番目のRDDにはレコード値がNoneで表されていない
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val arr2 = List(("A", "A1"), ("C", "C1"), ("A", "A2"), ("C", "C2"))
    val rdd1 = sc.parallelize(arr1, 3)
    val rdd2 = sc.parallelize(arr2, 3)
    val groupByKeyRDD = rdd1.leftOuterJoin(rdd2)
    groupByKeyRDD.foreach(println)

    sc.stop()

12. rightOuterJoin
右外部ジョイン.2番目のpairRDDは各キーに対応するレコードがあり、ソースRDDにはレコード値がNoneで表されていない
    val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val arr2 = List(("A", "A1"), ("C", "C1"), ("A", "A2"), ("C", "C2"))
    val rdd1 = sc.parallelize(arr1, 3)
    val rdd2 = sc.parallelize(arr2, 3)
    val groupByKeyRDD = rdd1.rightOuterJoin(rdd2)
    groupByKeyRDD.foreach(println)

    sc.stop()