Spark学習05——キー値対RDD変換演算子
50996 ワード
すべてのキー値対RDD変換演算子は以下の通りである.
mapValues、flatMapValues、sortByKey、combineByKey、foldByKey、groupByKey、reduceByKey、aggregateByKey、cogroup、join、leftOuterJoin、rightOuterJoin
もちろん、キー値はRDDに対してすべてのRDD変換演算子を使用することができ、詳細は以下を参照してください.https://blog.csdn.net/qq_23146763/article/details/100988127
具体的な解釈と例
1. mapValues
キーを変更せずにpairRDDの各値に対してmapを呼び出す
2. flatMapValues
キーを変更せずにpairRDDの各値に対してflatMapを呼び出す
3. sortByKey
1.keyがソートを実現すると、Keyでソートされた(K,V)キー値対からなるRDDを返し、accendingがtrueの場合は昇順、falseの場合は降順、numPartitionsはパーティション数を設定、ジョブ並列度を高める.並列度を1に設定すると、より良いソートが可能になります.
4. combineByKey
異なる戻りタイプを使用して、同じキーを持つ値comineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)をマージします.
createCombiner:Keysに初めて遭遇したときに結合器関数を作成し、RDDデータセットのVタイプ値をCタイプ値(V=>C)に変換します.Cは集合タイプです.注意:このプロセスは各パーティションで初めて各キーが現れるときに発生します.
mergeValue:連結値関数(パーティションごとに独立処理)、再び同じKeyに遭遇した場合、createCombiner道理のCタイプ値と今回入力したVタイプ値を1つのCタイプ値(C,V)=>C mergeCombiners:連結コンポジタ関数(各キーのパーティション結果を連結)に連結し、Cタイプ値の2つを1つにまとめるCタイプ値partioner:既存またはカスタムパーティション関数を使用し、デフォルトはHashPartitioner mapSideCombine:map側でCombine操作を行うかどうか、デフォルトはtrue
5. foldByKey
foldByKey,groupByKey,reduceByKey関数は最終的にcombineByKey関数を呼び出すことによって実現される役割である:vauleごとにzeroValueを加え、keyでzeroValeを集約する:Vを初期化し、実際にはCombineByKeyのcreateCombinerによって実現されるV=>(zeroValue,V)を集約し、func関数によって新しい値、すなわちfunc(zeroValue,V)にマッピングし、例4のように、各V先進行V=>2+V func:Valueはfunc関数によってKey値で連結される(実際にはCombineByKeyのmergeValue,mergeCombiners関数によって実現されるが、ここでは、この2つの関数は同じである)
6. groupByKey
キーでグループ化し、[K,Iterable[V],numPartitionsでパーティション数を設定し、ジョブ並列度を上げる
7. reduceByKey
Keyでグループ化し、与えられたfunc関数を使用してvalue値を集約し、numPartitionsでパーティション数を設定し、ジョブ並列度を向上
8. aggregateByKey
1.概念はPairRDD中の同じKey値を集約し、集約中にも中立的な初期値を使用する.
2.aggregateとの相違点1.Aggregateの集約結果はパーティション数と関係がある.AggregateByKeyの集約結果はパーティション数とは無関係である.Aggregateは各パーティション内の要素を集約する.AggregateByKeyはPairRDDの同じKey値を集約し、PairRDDに戻る
3.例の説明1.keyでグループ化し、seqで各グループvalueと初期値を比較し、小さな値を新しいキー値ペアとして得、combで各グループデータvalueをkeyで集約し、最終的に各グループにPairRDDを生成する
9. cogroup
2つのRDD(例えば、(K,V)と(K,W))の同じKeyの要素をそれぞれ集約し、最後に(K,Iterator,Iterator)形式のRDD,numPartitionsを返してパーティション数を設定し、ジョブ並列度を高める
10. join
内部接続両方のpairRDDに存在するキーのみが出力されます.1つの出力に対応するキーに複数の値がある場合、生成されたpairRDDは、2つの入力RDDからの各セットに対応するvalueを含む
11. leftOuterJoin
左外部ジョイン.ソースpairRDDは各キーに対応するレコードがあり、2番目のRDDにはレコード値がNoneで表されていない
12. rightOuterJoin
右外部ジョイン.2番目のpairRDDは各キーに対応するレコードがあり、ソースRDDにはレコード値がNoneで表されていない
mapValues、flatMapValues、sortByKey、combineByKey、foldByKey、groupByKey、reduceByKey、aggregateByKey、cogroup、join、leftOuterJoin、rightOuterJoin
もちろん、キー値はRDDに対してすべてのRDD変換演算子を使用することができ、詳細は以下を参照してください.https://blog.csdn.net/qq_23146763/article/details/100988127
具体的な解釈と例
1. mapValues
キーを変更せずにpairRDDの各値に対してmapを呼び出す
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val list = List(("zhangsan", 22), ("lisi", 20), ("wangwu", 23))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.mapValues(_ + 2)
mapValuesRDD.foreach(println)
sc.stop()
2. flatMapValues
キーを変更せずにpairRDDの各値に対してflatMapを呼び出す
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val list = List(("zhangsan", "GD SZ"), ("lisi", "HN YY"), ("wangwu", "JS NJ"))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.flatMapValues(v => v.split(" "))
mapValuesRDD.foreach(println)
sc.stop()
3. sortByKey
1.keyがソートを実現すると、Keyでソートされた(K,V)キー値対からなるRDDを返し、accendingがtrueの場合は昇順、falseの場合は降順、numPartitionsはパーティション数を設定、ジョブ並列度を高める.並列度を1に設定すると、より良いソートが可能になります.
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val arr = List((1, 20), (1, 10), (2, 20), (2, 10), (3, 20), (3, 10))
val rdd = sc.parallelize(arr)
val sortByKeyRDD = rdd.sortByKey(true, 1)
sortByKeyRDD.foreach(println)
sc.stop()
4. combineByKey
異なる戻りタイプを使用して、同じキーを持つ値comineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)をマージします.
createCombiner:Keysに初めて遭遇したときに結合器関数を作成し、RDDデータセットのVタイプ値をCタイプ値(V=>C)に変換します.Cは集合タイプです.注意:このプロセスは各パーティションで初めて各キーが現れるときに発生します.
mergeValue:連結値関数(パーティションごとに独立処理)、再び同じKeyに遭遇した場合、createCombiner道理のCタイプ値と今回入力したVタイプ値を1つのCタイプ値(C,V)=>C mergeCombiners:連結コンポジタ関数(各キーのパーティション結果を連結)に連結し、Cタイプ値の2つを1つにまとめるCタイプ値partioner:既存またはカスタムパーティション関数を使用し、デフォルトはHashPartitioner mapSideCombine:map側でCombine操作を行うかどうか、デフォルトはtrue
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//
val input = sc.parallelize(List(("coffee", 1), ("coffee", 2), ("panda", 4), ("panda", 5)))
val result = input.combineByKey(
(v) => (v),
(acc: (Int), v) => (acc + v),
(acc1: (Int), acc2: (Int)) => (acc1 + acc2)
)
result.foreach(println)
//
val result2 = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map { case (key, value) => (key, value._1 / value._2.toFloat) }
result2.collectAsMap().map(println)
sc.stop()
5. foldByKey
foldByKey,groupByKey,reduceByKey関数は最終的にcombineByKey関数を呼び出すことによって実現される役割である:vauleごとにzeroValueを加え、keyでzeroValeを集約する:Vを初期化し、実際にはCombineByKeyのcreateCombinerによって実現されるV=>(zeroValue,V)を集約し、func関数によって新しい値、すなわちfunc(zeroValue,V)にマッピングし、例4のように、各V先進行V=>2+V func:Valueはfunc関数によってKey値で連結される(実際にはCombineByKeyのmergeValue,mergeCombiners関数によって実現されるが、ここでは、この2つの関数は同じである)
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3))
val rdd = sc.parallelize(people)
val foldByKeyRDD = rdd.foldByKey(2)((_ + _))
foldByKeyRDD.foreach(println)
sc.stop()
6. groupByKey
キーでグループ化し、[K,Iterable[V],numPartitionsでパーティション数を設定し、ジョブ並列度を上げる
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val seq = Seq(("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 2))
val rdd = sc.parallelize(seq)
val groupRDD = rdd.groupByKey(3)
groupRDD.foreach(println)
sc.stop()
7. reduceByKey
Keyでグループ化し、与えられたfunc関数を使用してvalue値を集約し、numPartitionsでパーティション数を設定し、ジョブ並列度を向上
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val seq = Seq(("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 2))
val rdd = sc.parallelize(seq)
val reduceRdd = rdd.reduceByKey((v1, v2) => v1 + v2, 3)
reduceRdd.foreach(println)
sc.stop()
8. aggregateByKey
1.概念はPairRDD中の同じKey値を集約し、集約中にも中立的な初期値を使用する.
2.aggregateとの相違点1.Aggregateの集約結果はパーティション数と関係がある.AggregateByKeyの集約結果はパーティション数とは無関係である.Aggregateは各パーティション内の要素を集約する.AggregateByKeyはPairRDDの同じKey値を集約し、PairRDDに戻る
3.例の説明1.keyでグループ化し、seqで各グループvalueと初期値を比較し、小さな値を新しいキー値ペアとして得、combで各グループデータvalueをkeyで集約し、最終的に各グループにPairRDDを生成する
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(Seq((1, 2), (1, 3), (1, 4), (2, 5)))
def seq(a: Int, b: Int): Int = {
println("seq: " + a + "\t " + b)
math.min(a, b)
}
def comb(a: Int, b: Int): Int = {
println("comb: " + a + "\t " + b)
a + b
}
rdd.aggregateByKey(3, 2)(seq, comb).foreach(println)
sc.stop()
9. cogroup
2つのRDD(例えば、(K,V)と(K,W))の同じKeyの要素をそれぞれ集約し、最後に(K,Iterator,Iterator)形式のRDD,numPartitionsを返してパーティション数を設定し、ジョブ並列度を高める
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr2 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd1 = sc.parallelize(arr1, 3)
val rdd2 = sc.parallelize(arr2, 3)
val groupByKeyRDD = rdd1.cogroup(rdd2)
groupByKeyRDD.foreach(println)
sc.stop()
10. join
内部接続両方のpairRDDに存在するキーのみが出力されます.1つの出力に対応するキーに複数の値がある場合、生成されたpairRDDは、2つの入力RDDからの各セットに対応するvalueを含む
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr2 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd1 = sc.parallelize(arr1, 3)
val rdd2 = sc.parallelize(arr2, 3)
val groupByKeyRDD = rdd1.join(rdd2)
groupByKeyRDD.foreach(println)
sc.stop()
11. leftOuterJoin
左外部ジョイン.ソースpairRDDは各キーに対応するレコードがあり、2番目のRDDにはレコード値がNoneで表されていない
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr2 = List(("A", "A1"), ("C", "C1"), ("A", "A2"), ("C", "C2"))
val rdd1 = sc.parallelize(arr1, 3)
val rdd2 = sc.parallelize(arr2, 3)
val groupByKeyRDD = rdd1.leftOuterJoin(rdd2)
groupByKeyRDD.foreach(println)
sc.stop()
12. rightOuterJoin
右外部ジョイン.2番目のpairRDDは各キーに対応するレコードがあり、ソースRDDにはレコード値がNoneで表されていない
val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val arr1 = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr2 = List(("A", "A1"), ("C", "C1"), ("A", "A2"), ("C", "C2"))
val rdd1 = sc.parallelize(arr1, 3)
val rdd2 = sc.parallelize(arr2, 3)
val groupByKeyRDD = rdd1.rightOuterJoin(rdd2)
groupByKeyRDD.foreach(println)
sc.stop()