spark 2.2.0ソース読み---spark core包---partial/rdd
16464 ワード
1、本文の目標及びその他の説明:
この文章は主にpartial、rdd包の下の種類を紹介します.
2、partialパッケージの下のデータ構造説明
3、rddパッケージの下のデータ構造説明
本カバンの下のrddは具体的に多すぎて、しかもモードが統一されているので、ここでは一つか二つだけ研究すればいいです.
番号を付ける
この文章は主にpartial、rdd包の下の種類を紹介します.
2、partialパッケージの下のデータ構造説明
private[spark] trait ApproximateEvaluator[U, R] {
def merge(outputId: Int, taskResult: U): Unit
def currentResult(): R
}
このインターフェースの2つの方法は主に異なるtaskを逐次結合した後の結果に用いられる.タスクが終了するごとに、mergeメソッドを呼び出します.private[spark] class ApproximateActionListener[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long)
extends JobListener {
規定の時間内に結果を返します.この結果はすべてのパーティションの結果かもしれません.パーティションを一部の規定時間だけ実行した結果かもしれません.class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val
high: Double) {
mean/confidence/low/high, equals , true
private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[Long, BoundedDouble] {
要素の個数を返します.private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence:
Double) extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {
違うkeyを通じてアキュムレータを行う.private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
結果の平均を返します.private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
要素の合計を返します.3、rddパッケージの下のデータ構造説明
本カバンの下のrddは具体的に多すぎて、しかもモードが統一されているので、ここでは一つか二つだけ研究すればいいです.
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
}
BlockRDDのパーティションタイプ.blockIdは、親ブロック、現在のパーティションで参照されているパーティションブロックを表します.IDxは現在のblockrddのインデックスを表しています.番号を付ける
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient val blockIds: Array[BlockId] , RDD 。 rdd
:getPartitions / compute / getPreferredLocation
override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.length).map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}.toArray
} rdd , 。
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get[T](blockId) match {
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
}
} split ,context
override def getPreferredLocations(split: Partition): Seq[String] = { assertValid() _locations(split.asInstanceOf[BlockRDDPartition].blockId) } , 。
private[spark] case class NarrowCoGroupSplitDep( @transient rdd: RDD[_], @transient splitIndex: Int, var split: Partition ) extends Serializable { @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization split = rdd.partitions(splitIndex) oos.defaultWriteObject() } } , 。
private[spark] class CoGroupPartition( override val index: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]]) extends Partition with Serializable { override def hashCode(): Int = index override def equals(other: Any): Boolean = super.equals(other) } narrowDeps 。 CoGroupedRDD
class CoGroupedRDD[K: ClassTag]( @transient var rdds: Seq[RDD[_ <: product2="" style="color:#20999d;">K, _]]], part: Partitioner) extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
本RDD就是将 cogroup 算子产生的RDD。は本RDDに依存する血統関係を取得しました.override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner]( rdd.asInstanceOf[RDD[_ <: product2="" style="color:#20999d;">K, _]]], part, serializer) } } }
は現在のCoGroupRDDのために自分のpartitionsを生成する.override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match { case s: ShuffleDependency[_, _, _] => None case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array }
override def compute(s: Partition, context: TaskContext):
は、sパーティションのデータを計算する.Iterator[(K, Array[Iterable[_]])] = {
本クラスの中のいくつかの機能はsum/mean/分散、標準偏差などがあります.class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
RDDの並べ替えクラスも、暗黙的な変換によって強化されたRDDの機能であり、key valueがこのような形式のデータに適合する.class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: product2="" style="color:#20999d;">K, V] : ClassTag] @DeveloperApi() ( self: RDD[P]) extends Logging with Serializable {
は、特にkey valueというデータの状況を処理するために用いられ、演算子が提供され、暗黙的な変換によってrddによって使用される.class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with Serializable {
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val
は、hdfsなどの外部記憶システムにデータを書き込む.rdd: RDD[T]) extends RDDCheckpointData[T](rdd) with Logging {