Spark Schedulerモジュール(上)

13685 ワード

Sparkソースコードを読んでいるうちに、シングルステップのデバッグはプログラムを理解するのにあまり良くないことが分かりました.このようなマルチスレッドの分散システムは、より良いリーディングソースコードの方法は、モジュールによってそれぞれ理解される.
 
パッケージorg.apache.sparkの下には、deploy、storge、shuffle、schedulerなど、次のクラスのカバンがたくさんあります.これは一つのシステムモジュールです.本稿では主にschedulerモジュールを紹介します.
 
ブログhttp://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-scheduler%E 6%A 8%A 5%9 D%97/ Spark-0.7バージョンのSchedulerモジュールを説明します.このブログはかなり参考になりましたが、対応コードはSpark-1.5バージョンです.
 
また、本文を読む前に、Sparkの古典的な論文を読んだほうがいいです.この四つの用語はRDD、狭い依存、幅依存、stageです.
注:本文はソースコードを貼り付ける過程で、主幹の論理に関する部分だけを選択しました.
 
==========================本文の始まり==========================================================================
 
近代的な分散式計算システムは、古典的なYARNを例にとって、資源の統一管理プラットフォームを実現し、Spark起動時にYARNに一定の計算リソース(CPUとメモリ)を申請し、自分でこれらの底の計算リソースを管理します.また、Sparkは、上位ユーザプログラムのタスクスケジュールを担当し、ユーザープログラムを個々の小さなタスクに分解して実行する.本稿では主にSparkの上位タスクスケジュールを分析する.(ユーザーに近いので分かりやすい)
 
RDD.co untを追跡すると、実際の実行文はSparkConttext.run Jobであり、下に追跡すると最終的に呼び出される方法はdagScheduler.run Jobであることが分かります.そしてDAGS chedulerはjobをstageに変えてstageをtaskに変換して、最後にtask Scheduler.submitTaskを呼び出してタスクを実行ユニットに提出しました.ここで一番重要な二つの種類は DAGS chedulerとTask Scheduler.
 
DAGS cheduler
SparkContect起動時にも作成して起動しました.taskSchedulerと _dagScheduler. 
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
SparkContect.creat TaskSchedulerの方法は、masterによって異なります. _taskScheduler、例えばmaster=localはクラスTask SchedulerImplで、master=yarn-clusterはorg.apache.spark.scheduler.cluster.YarnCusterSchedulerに対応します.TaskySchedulerは資源スケジューラ(YARN,Mesos,Local)で動作する必要があるので、それぞれ実現する必要があります.DAGS chedulerはSpark自身のロジックであり、一つだけ実現すればいいです.このような設計上の分離も私達が学ぶべきです.
 
DAGS chedulerの重要な属性に注目する必要があります.
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
BlockingQueを使って実現されたメッセージ・キューです.DAGS cheduler EventProcess Loopにおける方法 オンリサイクは、ユーザからのDAGS chedulerEventタイプのイベントを非同期的に受け入れ、それぞれ処理することができる.イベントの種類は以下の通りです.
JobSubmitted
CompletionEvent
StageCancelled
JobCancelled
JobからStageまでの完全な流れ
SparkConttext.runJobがDAGS cheduler.runJobを呼び出した時に、着信するパラメータはRDDです.DAGS cheduler.run Jobは何もしていません.またRDDを伝えました. DAGS cheduler.submitJob:
def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    val jobId = nextJobId.getAndIncrement()
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
submitJob()は独立したjobIdを作成し、転送されたwaiterを作成し、イベントJobSubmittedをeventProcess Loopに送信しました.上記で述べたように、これは非同期のメッセージ・キューである.そこで関数の调节者(DAGS cheduler.run Job)はウォiterを使ってメッセージを待てばいいです.
 
質に入れる  eventProcess Loop:DAGS cheduler Event Process Loopイベントを受け入れる JobSubmittedを呼び出します. DAGS cheduler.handlee JobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
  if (finalStage != null) {
    val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
    activeJobs += job
    finalStage.resultOfJob = Some(job)
    submitStage(finalStage)
  }
  submitWaitingStages()
}
まずfinal Stageを作成してsumbiitStageに提出します.submit WaitingStagsは提出前に失敗しましたが、現在は条件を満たす作業です.ここの核心はsubmitStageです.
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}
private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new Stack[RDD[_]]
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        for (dep <- rdd.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
              if (!mapStage.isAvailable) {
                missing += mapStage
              }
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.push(narrowDep.rdd)
          }
        }
      }
    }
  }
  waitingForVisit.push(stage.rdd)
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())
  }
  missing.toList
}
ここのparent stageは、rddの依存関係を経て再帰的に得られたものである.幅依存(Shuffle Dependency)に対しては、Sparkは、final StageのmissingPartとして新たなmapStageを生成し、狭い依存(Narrow Dependency)に対して、Sparkは新しいstageを生成しない.ここでのstageの区分は、論文のあり方を実現したものです.この区分方法は、急速な回復が利点である.もし一つのrddのpartitionがなくなったら、このpartitionは狭い依存性だけであれば、そのparent rddも対応する一つのpartitionを計算するだけでデータ回復を実現できる.
このようなstageの分割戦略こそ、いわゆるDAG図である.ステージのDAGが発生した後、ツリー構造によって、トポロジーが整然としていて、一つ一つのステージが実行されます.つまり、親ステージが計算された後、サブステージの計算ができます.
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    //       stage,      task       ,    
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
      case stage: ResultStage =>
        closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
    }
    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    // ..
  }

  //    stage     task      partition   
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.internalAccumulators)
        }
      case stage: ResultStage =>
        val job = stage.resultOfJob.get
        partitionsToCompute.map { id =>
          val p: Int = job.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, stage.internalAccumulators)
        }
    }
  } catch {
    //..
  }

  //     stage     tasks     taskSchduler  
  if (tasks.size > 0) {
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  }
}
これで、jobはDAGS chedulerでstage DAG図の構築を完成しました.stageはtaskグループに変換して、最後にtask Schedulerに提出します.taskが実行されると、DAGS cheduler.handleTasCompletionが呼び出されます.
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
  val task = event.task
  val stageId = task.stageId
  event.reason match {
    case Success =>
      task match {
          case rt: ResultTask[_, _] =>
          ...
          case smt: ShuffleMapTask =>    
          ...
      }

    case Resubmitted =>
      ...
    case TaskResultLost =>
      ...
    case other =>
      ...
  }
}
いろいろな状況を経て、それぞれ処理します.
RDDの計算
RDDの計算はtaskで行われ、狭い依存性と広い依存性によって、ResultTaskとShuffleMapTaskに分けられ、それぞれ具体的な実行過程を見てみます.
// ResultTask
override
def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) func(context, rdd.iterator(partition, context)) }
override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
      ...
    }
  }
ResultTaskとShuffleMapTaskはいずれもSparkEnd.get.closureSerialzerを呼び出してtask Binaryをアンチプログレッシブ操作し、RDD.iteratorを呼び出してRDDを計算し、変換します.異なる点は、ResultTaskの後にFnc()の計算結果を呼び出し、ShuffleMapTaskは結果をblockManagerに保存してShuffleに使うことです.
 
これにより、DAGS chedulerの実行過程を大まかに分析した.次の編では、Task Schedulerについて話します.