RDD血縁関係のソースコードの詳細!


一、RDDの依存関係
RDDの依存関係は,幅依存と狭い依存の2種類に分類される.次のように考えることができます.
  • (1)狭い依存性:各parent RDDのpartitionはchild RDDの1つのpartitionによって最も多く使用される.
  • (2)幅依存性:各parent RDD partitionは、複数のchild RDDのpartitionによって使用される.

  • 各child RDDに狭い依存のpartitionの生成動作は並列に行うことができ,広い依存はすべてのparent RDD partition shuffle結果が得られた後に行う必要がある.
    二、org.apache.spark.Dependency.scalaソースコード解析
    Dependencyは抽象クラスです.
    // Denpendency.scala
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    NarrowDependencyとShuffleDenpendencyの2つのサブクラスがあり,それぞれ狭い依存性と広い依存性に対応している.
    (1)NarrowDependencyも抽象クラス
    抽象メソッドgetParentsを定義し、child RDDのあるpartition依存parent RDDのすべてのpartitionsを得るためにpartitionIdを入力する.
    // Denpendency.scala
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {  
    /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    狭い依存性には、OneToOneDependencyとRangeDependencyの2つの具体的な実装があります.(a)OneToOneDependencyとは、child RDDのpartitionがparent RDDの1つのpartitionにのみ依存し、OneToOneDependencyを生成する演算子としてmap、filter、flatMapなどがある.getParentsの実装は簡単で、partionIdをリストに入れて渡すことです.
    // Denpendency.scala
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
            (b)RangeDependency child RDD partition              parent RDD partition,    union。
    
    // Denpendency.scala
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)  
      extends NarrowDependency[T](rdd) {//inStart  parent RDD     ,outStart  child RDD      
      override def getParents(partitionId: Int): List[Int] = {    
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)//            
        } else {
          Nil
        }
      }
    }
    (2)ShuffleDependency指幅依存
    1つのparent RDDを表すpartitionはchild RDDのpartitionによって複数回使用される.shuffleを経て形成する必要があります.
    // Denpendency.scala
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: product2="" v="" val="" partitioner:="" partitioner="" serializer:="" serializer="SparkEnv.get.serializer," keyordering:="" option="" none="" aggregator:="" c="" mapsidecombine:="" boolean="false)" extends="" dependency="" override="" def="" rdd:="" rdd="" _rdd.asinstanceof="" private="" keyclassname:="" string="reflect.classTag[K].runtimeClass.getName" valueclassname:="" combinerclassname:="" shuffleid:="" int="_rdd.context.newShuffleId()" shufflehandle:="" shufflehandle="_rdd.context.env.shuffleManager.registerShuffle(" shuffleid="" _rdd.partitions.length="" this="" _rdd.sparkcontext.cleaner.foreach=""/>
    shuffleはネットワーク転送に関わるため、シーケンス化serializerが必要であり、ネットワーク転送を減らすためにmapエンド集約が可能であり、mapSideCombineとaggregator制御、keyソートに関するkeyOrdering、再出力データがどのようにパーティション化されるかのpartitionor、class情報もある.Partition間の関係はshuffleで止まっているので、shuffleはstageを区別する根拠です.
    三、二種類の依存の区分
    まず、狭い依存性は、1つのクラスタノード上ですべての親パーティションを流水線で計算することを可能にする.例えば、mapを要素ごとに実行し、filter操作を行う.一方,広い依存性は,まずすべての親パーティションデータを計算し,次いでノード間でShuffleを行う必要があり,これはMapReduceと類似している.第二に、狭い依存性は、RDDパーティションを失った親パーティションを再計算するだけで、異なるノード間で並列に計算できる失効ノードの回復をより効果的に行うことができる.一方,広い依存関係のLineage図では,単一ノードの失効は,このRDDのすべての祖先が部分的なパーティションを失う可能性があるため,全体的な再計算が必要である.