Spark RDDソート演算子
4857 ワード
RDDソート演算子にはsortByとsortByKeyの2つの演算があり、sortBy演算子はソートルールをカスタマイズすることができますが、sortByKeyはKeyに対してScalaまたはSparkのデフォルトでサポートされているソートルールしか使用できません.ScalaまたはSparkがソートルールをサポートしていない場合は、sortBy自身でソートルールを実現する必要があります.
sortByKeyのコア実装コード:
以上のコードから分かるように、
sortByKeyはordering暗黙値を用いて並べ替えられているので,現在の役割ドメインにOrdering[K]値が存在する限り並べ替えられるが,
sortByKeyは暗黙的な値の入力をサポートしていないので、scalaとsparkシステムのデフォルトの暗黙的な値しか使用できません.そのため、一部のKeyのソートをサポートします.
次にsortByのコア実装コードを見てみましょう.
ソースコードから,sortByが下位層で使用されているのかsortByKeyを実装しているのかが容易に分かるが,sortByは選択キーの関数を1つ入力したにすぎず,ソート規則の暗黙的な値をspark暗黙変換システムに入力し,ソート時にカスタムKeyのOrdering[K]を見つけることができる.
次に実戦でプレゼンテーションします.
1、実戦でScalaがサポートするソートタイプInt:
以上のコードは、RDDを生成するために使用される補助方法としてn個のサイズのListシーケンスを生成するものである.
2、次に1のnumsを元グループKV形式に変換した後にソートする.
メタグループソートの場合、scalaおよびsparkは、scala.math.OrderingにあるメタグループのOrdering暗黙的な値を提供します.次に、二元グループの暗黙的な値について説明します.
デフォルトの二元グループソートの暗黙的な値のように、二元グループのソートを実践します.
pairsをsortByKeyでソートすることもできます.
3、次にカスタムタイプを並べ替えます.自分でOrderingを実現する必要があります.
Dogを提供するOrderingを表示しないと、暗黙的な値がないとエラーが表示されます.
暗黙的な値を指定すると、
再ソート:
結論:ScalaシステムにデフォルトのOrdering値を提供する場合、与えられた暗黙的な値を表示しないことができますが、存在しない場合は、役割ドメインに表示する必要があります.
sortByKeyのコア実装コード:
class OrderedRDDFunctions extends Logging with Serializable {
private val ordering = implicitly[Ordering[K]] // 1
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)// 2
}
//
}
以上のコードから分かるように、
sortByKeyはordering暗黙値を用いて並べ替えられているので,現在の役割ドメインにOrdering[K]値が存在する限り並べ替えられるが,
sortByKeyは暗黙的な値の入力をサポートしていないので、scalaとsparkシステムのデフォルトの暗黙的な値しか使用できません.そのため、一部のKeyのソートをサポートします.
次にsortByのコア実装コードを見てみましょう.
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f).sortByKey(ascending, numPartitions).values
}
ソースコードから,sortByが下位層で使用されているのかsortByKeyを実装しているのかが容易に分かるが,sortByは選択キーの関数を1つ入力したにすぎず,ソート規則の暗黙的な値をspark暗黙変換システムに入力し,ソート時にカスタムKeyのOrdering[K]を見つけることができる.
次に実戦でプレゼンテーションします.
1、実戦でScalaがサポートするソートタイプInt:
def getRandomList(n: Int): List[Int] = {
var result: List[Int] = Nil
while (result.length < n) {
result = Random.nextInt(1000) :: result
}
result
}
以上のコードは、RDDを生成するために使用される補助方法としてn個のサイズのListシーケンスを生成するものである.
scala> val nums = sc.parallelize(getRandomList(10))
res41: Array[Int] = Array(246, 157, 15, 488, 212, 513, 293, 224, 373, 242)
scala> nums.sortBy(x=>x).collect// Scala sortBy
res45: Array[Int] = Array(15, 157, 212, 224, 242, 246, 293, 373, 488, 513)
Scalaでサポートされているソート・タイプに対してsortByソートを直接使用するには、scala.math.Orderingでは、実装コードが次のようになっているため、暗黙値Ordering[Int]を実装する必要はありません. trait IntOrdering extends Ordering[Int] {
def compare(x: Int, y: Int) =
if (x < y) -1
else if (x == y) 0
else 1
}
implicit object Int extends IntOrdering
その他のLong、Shortなどのタイプは同じです!2、次に1のnumsを元グループKV形式に変換した後にソートする.
メタグループソートの場合、scalaおよびsparkは、scala.math.OrderingにあるメタグループのOrdering暗黙的な値を提供します.次に、二元グループの暗黙的な値について説明します.
implicit def Tuple2[T1, T2](implicit ord1: Ordering[T1], ord2: Ordering[T2]): Ordering[(T1, T2)] =
new Ordering[(T1, T2)]{
def compare(x: (T1, T2), y: (T1, T2)): Int = {
val compare1 = ord1.compare(x._1, y._1)
if (compare1 != 0) return compare1
val compare2 = ord2.compare(x._2, y._2)
if (compare2 != 0) return compare2
0
}
}
このデフォルトの二元グループソート規則は、最初の要素に基づいてソートされ、最初の要素が同じ場合、2番目の要素に基づいてソートされます.デフォルトの二元グループソートの暗黙的な値のように、二元グループのソートを実践します.
scala> val pairs = nums.map(x => (x,x))// sortBy sortByKey
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at map at :35
scala> pairs.sortBy(x=>x).collect // sortBy Key
res49: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))
ここでソートするKeyの規則関数はx=>xであり、OrderedRDDFunctionsにはOrdering[Int]が存在するため、暗黙的な値を表示する必要はありません.pairsをsortByKeyでソートすることもできます.
scala> pairs.sortByKey().collect // key
res54: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))
3、次にカスタムタイプを並べ替えます.自分でOrderingを実現する必要があります.
Dogを提供するOrderingを表示しないと、暗黙的な値がないとエラーが表示されます.
case class Dog(var age:Int)
val dogs = List(Dog(3), Dog(1), Dog(6), Dog(8))
scala> val dogsRDD = sc.parallelize(dogs)
dogsRDD: org.apache.spark.rdd.RDD[Dog] = ParallelCollectionRDD[97] at parallelize at :35
scala> dogsRDD.map((_,1)).sortBy(x=>x._1)
:31: error: No implicit Ordering defined for Dog.// : Dog Ordering
Error occurred in an application involving default arguments.
dogsRDD.map((_,1)).sortBy(x=>x._1)
暗黙的な値を指定すると、
implicit object DogOrdering extends Ordering[Dog] {
override def compare(e1:Dog, e2:Dog): Int = {
(e1.age-e2.age).toInt
}
}
再ソート:
scala> dogsRDD.map((_,1)).sortBy(x=>x._1).collect
res6: Array[(Dog, Int)] = Array((Dog(1),1), (Dog(3),1), (Dog(6),1), (Dog(8),1))
は正しく並べ替えられます!結論:ScalaシステムにデフォルトのOrdering値を提供する場合、与えられた暗黙的な値を表示しないことができますが、存在しない場合は、役割ドメインに表示する必要があります.