Spark RDDソート演算子

4857 ワード

RDDソート演算子にはsortByとsortByKeyの2つの演算があり、sortBy演算子はソートルールをカスタマイズすることができますが、sortByKeyはKeyに対してScalaまたはSparkのデフォルトでサポートされているソートルールしか使用できません.ScalaまたはSparkがソートルールをサポートしていない場合は、sortBy自身でソートルールを実現する必要があります.
sortByKeyのコア実装コード:
class OrderedRDDFunctions extends Logging with Serializable {
	
  private val ordering = implicitly[Ordering[K]] //    1

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)//    2
  }
   //      
}

以上のコードから分かるように、
sortByKeyはordering暗黙値を用いて並べ替えられているので,現在の役割ドメインにOrdering[K]値が存在する限り並べ替えられるが,
sortByKeyは暗黙的な値の入力をサポートしていないので、scalaとsparkシステムのデフォルトの暗黙的な値しか使用できません.そのため、一部のKeyのソートをサポートします.
次にsortByのコア実装コードを見てみましょう.
  /**
    * Return this RDD sorted by the given key function.
    */
  def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
                      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    
    this.keyBy[K](f).sortByKey(ascending, numPartitions).values
  }

ソースコードから,sortByが下位層で使用されているのかsortByKeyを実装しているのかが容易に分かるが,sortByは選択キーの関数を1つ入力したにすぎず,ソート規則の暗黙的な値をspark暗黙変換システムに入力し,ソート時にカスタムKeyのOrdering[K]を見つけることができる.
次に実戦でプレゼンテーションします.
1、実戦でScalaがサポートするソートタイプInt:
  def getRandomList(n: Int): List[Int] = {

    var result: List[Int] = Nil

    while (result.length < n) {
      result = Random.nextInt(1000) :: result
    }

    result
  }

以上のコードは、RDDを生成するために使用される補助方法としてn個のサイズのListシーケンスを生成するものである.
scala> val nums = sc.parallelize(getRandomList(10))
res41: Array[Int] = Array(246, 157, 15, 488, 212, 513, 293, 224, 373, 242)
scala> nums.sortBy(x=>x).collect//   Scala           sortBy  
res45: Array[Int] = Array(15, 157, 212, 224, 242, 246, 293, 373, 488, 513)
Scalaでサポートされているソート・タイプに対してsortByソートを直接使用するには、scala.math.Orderingでは、実装コードが次のようになっているため、暗黙値Ordering[Int]を実装する必要はありません.
  trait IntOrdering extends Ordering[Int] {
    def compare(x: Int, y: Int) =
      if (x < y) -1
      else if (x == y) 0
      else 1
  }
  implicit object Int extends IntOrdering
その他のLong、Shortなどのタイプは同じです!
2、次に1のnumsを元グループKV形式に変換した後にソートする.
メタグループソートの場合、scalaおよびsparkは、scala.math.OrderingにあるメタグループのOrdering暗黙的な値を提供します.次に、二元グループの暗黙的な値について説明します.
  implicit def Tuple2[T1, T2](implicit ord1: Ordering[T1], ord2: Ordering[T2]): Ordering[(T1, T2)] =
    new Ordering[(T1, T2)]{
      def compare(x: (T1, T2), y: (T1, T2)): Int = {
        val compare1 = ord1.compare(x._1, y._1)
        if (compare1 != 0) return compare1
        val compare2 = ord2.compare(x._2, y._2)
        if (compare2 != 0) return compare2
        0
      }
    }
このデフォルトの二元グループソート規則は、最初の要素に基づいてソートされ、最初の要素が同じ場合、2番目の要素に基づいてソートされます.
デフォルトの二元グループソートの暗黙的な値のように、二元グループのソートを実践します.
scala> val pairs = nums.map(x => (x,x))//        sortBy sortByKey
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at map at :35
scala> pairs.sortBy(x=>x).collect //  sortBy         Key
res49: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))
ここでソートするKeyの規則関数はx=>xであり、OrderedRDDFunctionsにはOrdering[Int]が存在するため、暗黙的な値を表示する必要はありません.
pairsをsortByKeyでソートすることもできます.
scala> pairs.sortByKey().collect //         key    
res54: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))

3、次にカスタムタイプを並べ替えます.自分でOrderingを実現する必要があります.
Dogを提供するOrderingを表示しないと、暗黙的な値がないとエラーが表示されます.
case class Dog(var age:Int)
val dogs = List(Dog(3), Dog(1), Dog(6), Dog(8))
scala> val dogsRDD = sc.parallelize(dogs)
dogsRDD: org.apache.spark.rdd.RDD[Dog] = ParallelCollectionRDD[97] at parallelize at :35

scala> dogsRDD.map((_,1)).sortBy(x=>x._1)
:31: error: No implicit Ordering defined for Dog.//  :  Dog Ordering   
Error occurred in an application involving default arguments.
       dogsRDD.map((_,1)).sortBy(x=>x._1)

暗黙的な値を指定すると、
implicit object DogOrdering extends Ordering[Dog] {
  override def compare(e1:Dog, e2:Dog): Int = {
   (e1.age-e2.age).toInt
  }
}

再ソート:
scala> dogsRDD.map((_,1)).sortBy(x=>x._1).collect
res6: Array[(Dog, Int)] = Array((Dog(1),1), (Dog(3),1), (Dog(6),1), (Dog(8),1))
は正しく並べ替えられます!
結論:ScalaシステムにデフォルトのOrdering値を提供する場合、与えられた暗黙的な値を表示しないことができますが、存在しない場合は、役割ドメインに表示する必要があります.