RDD血縁関係のソースコードの詳細!
一、RDDの依存関係
RDDの依存関係は,幅依存と狭い依存の2種類に分類される.次のように考えることができます.(1)狭い依存性:各parent RDDのpartitionはchild RDDの1つのpartitionによって最も多く使用される. (2)幅依存性:各parent RDD partitionは、複数のchild RDDのpartitionによって使用される.
各child RDDに狭い依存のpartitionの生成動作は並列に行うことができ,広い依存はすべてのparent RDD partition shuffle結果が得られた後に行う必要がある.
二、org.apache.spark.Dependency.scalaソースコード解析
Dependencyは抽象クラスです.
(1)NarrowDependencyも抽象クラス
抽象メソッドgetParentsを定義し、child RDDのあるpartition依存parent RDDのすべてのpartitionsを得るためにpartitionIdを入力する.
1つのparent RDDを表すpartitionはchild RDDのpartitionによって複数回使用される.shuffleを経て形成する必要があります.
三、二種類の依存の区分
まず、狭い依存性は、1つのクラスタノード上ですべての親パーティションを流水線で計算することを可能にする.例えば、mapを要素ごとに実行し、filter操作を行う.一方,広い依存性は,まずすべての親パーティションデータを計算し,次いでノード間でShuffleを行う必要があり,これはMapReduceと類似している.第二に、狭い依存性は、RDDパーティションを失った親パーティションを再計算するだけで、異なるノード間で並列に計算できる失効ノードの回復をより効果的に行うことができる.一方,広い依存関係のLineage図では,単一ノードの失効は,このRDDのすべての祖先が部分的なパーティションを失う可能性があるため,全体的な再計算が必要である.
RDDの依存関係は,幅依存と狭い依存の2種類に分類される.次のように考えることができます.
各child RDDに狭い依存のpartitionの生成動作は並列に行うことができ,広い依存はすべてのparent RDD partition shuffle結果が得られた後に行う必要がある.
二、org.apache.spark.Dependency.scalaソースコード解析
Dependencyは抽象クラスです.
// Denpendency.scala
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
NarrowDependencyとShuffleDenpendencyの2つのサブクラスがあり,それぞれ狭い依存性と広い依存性に対応している.(1)NarrowDependencyも抽象クラス
抽象メソッドgetParentsを定義し、child RDDのあるpartition依存parent RDDのすべてのpartitionsを得るためにpartitionIdを入力する.
// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
狭い依存性には、OneToOneDependencyとRangeDependencyの2つの具体的な実装があります.(a)OneToOneDependencyとは、child RDDのpartitionがparent RDDの1つのpartitionにのみ依存し、OneToOneDependencyを生成する演算子としてmap、filter、flatMapなどがある.getParentsの実装は簡単で、partionIdをリストに入れて渡すことです.// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
(b)RangeDependency child RDD partition parent RDD partition, union。
// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {//inStart parent RDD ,outStart child RDD
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)//
} else {
Nil
}
}
}
(2)ShuffleDependency指幅依存1つのparent RDDを表すpartitionはchild RDDのpartitionによって複数回使用される.shuffleを経て形成する必要があります.
// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: product2="" v="" val="" partitioner:="" partitioner="" serializer:="" serializer="SparkEnv.get.serializer," keyordering:="" option="" none="" aggregator:="" c="" mapsidecombine:="" boolean="false)" extends="" dependency="" override="" def="" rdd:="" rdd="" _rdd.asinstanceof="" private="" keyclassname:="" string="reflect.classTag[K].runtimeClass.getName" valueclassname:="" combinerclassname:="" shuffleid:="" int="_rdd.context.newShuffleId()" shufflehandle:="" shufflehandle="_rdd.context.env.shuffleManager.registerShuffle(" shuffleid="" _rdd.partitions.length="" this="" _rdd.sparkcontext.cleaner.foreach=""/>
shuffleはネットワーク転送に関わるため、シーケンス化serializerが必要であり、ネットワーク転送を減らすためにmapエンド集約が可能であり、mapSideCombineとaggregator制御、keyソートに関するkeyOrdering、再出力データがどのようにパーティション化されるかのpartitionor、class情報もある.Partition間の関係はshuffleで止まっているので、shuffleはstageを区別する根拠です.三、二種類の依存の区分
まず、狭い依存性は、1つのクラスタノード上ですべての親パーティションを流水線で計算することを可能にする.例えば、mapを要素ごとに実行し、filter操作を行う.一方,広い依存性は,まずすべての親パーティションデータを計算し,次いでノード間でShuffleを行う必要があり,これはMapReduceと類似している.第二に、狭い依存性は、RDDパーティションを失った親パーティションを再計算するだけで、異なるノード間で並列に計算できる失効ノードの回復をより効果的に行うことができる.一方,広い依存関係のLineage図では,単一ノードの失効は,このRDDのすべての祖先が部分的なパーティションを失う可能性があるため,全体的な再計算が必要である.