spark 2.2.0ソース読み---spark core包---partial/rdd

16464 ワード

1、本文の目標及びその他の説明:
    この文章は主にpartial、rdd包の下の種類を紹介します.
2、partialパッケージの下のデータ構造説明
private[spark] trait ApproximateEvaluator[U, R] {
  def merge(outputId: Int, taskResult: U): Unit
  def currentResult(): R
}
このインターフェースの2つの方法は主に異なるtaskを逐次結合した後の結果に用いられる.タスクが終了するごとに、mergeメソッドを呼び出します.
private[spark] class ApproximateActionListener[T, U, R](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    evaluator: ApproximateEvaluator[U, R],
    timeout: Long)
  extends JobListener {
規定の時間内に結果を返します.この結果はすべてのパーティションの結果かもしれません.パーティションを一部の規定時間だけ実行した結果かもしれません.
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val 
high: Double) {
       mean/confidence/low/high,  equals     ,           true
private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
  extends ApproximateEvaluator[Long, BoundedDouble] {
要素の個数を返します.
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: 
Double)  extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {
 違うkeyを通じてアキュムレータを行う.
private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
結果の平均を返します.
private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
要素の合計を返します.
3、rddパッケージの下のデータ構造説明
本カバンの下のrddは具体的に多すぎて、しかもモードが統一されているので、ここでは一つか二つだけ研究すればいいです.
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
  val index = idx
}
BlockRDDのパーティションタイプ.blockIdは、親ブロック、現在のパーティションで参照されているパーティションブロックを表します.IDxは現在のblockrddのインデックスを表しています.
番号を付ける
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
  extends RDD[T](sc, Nil) {
 @transient val blockIds: Array[BlockId]         ,      RDD   。   rdd
            :getPartitions  /  compute / getPreferredLocation
override def getPartitions: Array[Partition] = {
  assertValid()
  (0 until blockIds.length).map { i =>
    new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
  }.toArray
}      rdd    ,     。
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  assertValid()
  val blockManager = SparkEnv.get.blockManager
  val blockId = split.asInstanceOf[BlockRDDPartition].blockId
  blockManager.get[T](blockId) match {
    case Some(block) => block.data.asInstanceOf[Iterator[T]]
    case None =>
      throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
  }
} split    ,context              
 
  
override def getPreferredLocations(split: Partition): Seq[String] = {
  assertValid()
  _locations(split.asInstanceOf[BlockRDDPartition].blockId)
}         ,            。
 
  
private[spark] case class NarrowCoGroupSplitDep(
    @transient rdd: RDD[_],
    @transient splitIndex: Int,
    var split: Partition
  ) extends Serializable {

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    // Update the reference to parent split at the time of task serialization
    split = rdd.partitions(splitIndex)
    oos.defaultWriteObject()
  }
}         ,        。
private[spark] class CoGroupPartition(
    override val index: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])
  extends Partition with Serializable {
  override def hashCode(): Int = index
  override def equals(other: Any): Boolean = super.equals(other)
} narrowDeps        。    CoGroupedRDD   
 
  
class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: product2="" style="color:#20999d;">K, _]]],
    part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
 
  
 
  本RDD就是将 
  cogroup  
  算子产生的RDD。 
  
override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner == Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: product2="" style="color:#20999d;">K, _]]], part, serializer)
    }
  }
}
は本RDDに依存する血統関係を取得しました.
override def getPartitions: Array[Partition] = {
  val array = new Array[Partition](part.numPartitions)
  for (i 0 until array.length) {
    // Each CoGroupPartition will have a dependency per contributing RDD
    array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
      // Assume each RDD contributed a single dependency, and get it
      dependencies(j) match {
        case s: ShuffleDependency[_, _, _] =>
          None
        case _ =>
          Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
      }
    }.toArray)
  }
  array
}
は現在のCoGroupRDDのために自分のpartitionsを生成する.
override def compute(s: Partition, context: TaskContext): 
Iterator[(K, Array[Iterable[_]])] = {
は、sパーティションのデータを計算する.
 
  

class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
本クラスの中のいくつかの機能はsum/mean/分散、標準偏差などがあります.
 
  
class OrderedRDDFunctions[K : Ordering : ClassTag,
                          V: ClassTag,
                          P <: product2="" style="color:#20999d;">K, V] : ClassTag] @DeveloperApi() (
    self: RDD[P])
  extends Logging with Serializable {
RDDの並べ替えクラスも、暗黙的な変換によって強化されたRDDの機能であり、key valueがこのような形式のデータに適合する.
 
  
class PairRDDFunctions[K, V](self: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  extends Logging with Serializable {
は、特にkey valueというデータの状況を処理するために用いられ、演算子が提供され、暗黙的な変換によってrddによって使用される.
 
  
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val 
rdd: RDD[T])  extends RDDCheckpointData[T](rdd) with Logging {
は、hdfsなどの外部記憶システムにデータを書き込む.