Spark Streaming(2) - JobScheduler、JobGenerator
8087 ワード
本論文はSpark 2.11に基づく
1.はじめに
Spark Streaming(1)では、JobSchedulerがJobGeneratorを使用してDStream DAGに基づいて一定時間ごとにRDD DAGを作成し、jobをコミットすることができると述べており、ここでは主にJobSchedulerの詳細について説明する.
2. JobScheduler
JobSchedulerは、StreamingContextがstartを呼び出すと起動します.起動シーケンスは次のとおりです.
JobSchedulerには次のメンバーがいます. jobSets. job生成時間はjobsへのマッピングであり、JobGeneratorはDStreamGraphを呼び出して保有するDStream DAGごとにjobを生成してJobGeneratorに返し、JobGeneratorは時間および生成したjobsをJobscheudlerにフィードバックし、jobSetsに保存する.JobGeneratorはjobをコミットしていません.jobはJobScheudlerによってコミットされています. numConcurrentJobsは、同時に実行できるjobの数を制御します. jobExecutorスレッドプールnumConccurrentJobsによってスレッド数を制御し、jobExecutorにjobをコミットして結果を待つ.待機結果はブロック操作であるため、1つのスレッドは同時に1つのjob しかコミットできない. jobGenerator JobScheduler依頼job を生成する receiverTracker,JobSchedulerが起動し,Receiverから報告されたデータbatch情報を受信する.
3.JobGenerator生成job
JobSchedulerがJobGeneratorにjobの生成を依頼したことについて説明します.次はJobGeneratorのコアメンバーです. timerタイマ、JobGeneratorはタイミング的にjobを生成し、時間間隔batchDurationはStreamingContextの作成が転送され、このtimerはtimeDuration時間網eventLoopからjobを生成するメッセージを送信する. eventLoopは常に実行され、メッセージを受信し、処理を行う.受信メッセージのタイプは、 です. GenerateJobs、DSteamGraphを使用してjob を生成 DoCheckpoint、新しいjobを提出してcheckpoint をします ClearCheckpointData,DoCheckpointはいずれもjob完了後に情報を明らかにする である.
生成job timerタイマbatchDurationごとにGenerateJobイベント生成jobをeventLoopに送信します.次に、eventLoop時間のメインサイクルでGenerateJobイベント呼び出しを処理します.
次はJobGeneratorのgenerateJobs receiverTracker.allocateBlocksToBatch(time)は、現在時刻timeに基づいて、報告するデータからデータブロックを生成し、その後DStreamに基づいてRDDを生成するデータは、timeに基づいて今回生成するデータブロック を検索する. graph.generateJobs生成jobs jobScheduler.submitJobSet,フィードバックJobscheudler待ち人物スケジューリング eventLoop.post,job作成checkpoint ステップ2の作成でjobを作成するには、次の呼び出しシーケンスがあります.
timeとjobFuncの関数を使用してJobを作成し、jobFuncはスケジューリング時に実行されます.
4.JobSchedulerスケジューリングjob
3ではJobGeneratorがjobsを生成し、生成したjobをJobSchedulerにフィードバックすることに言及し、2ではJobSchedulerがjobExecutorを使用してjobをスケジューリングすることについて述べた.
次に、JobSchedulerのsubmitJobSetメソッドを示します.
上記のコードの
JobHandlerはRunnableインタフェースを実現し、そのrun方法は以下の通りです.
Job#runメソッドを呼び出し、runメソッドでjobFuncを実行してjobのコミットを完了します.
job並列度の制御JobSchedulerのメンバーnumConcurrentJobsは同時にどれだけのstream jobが実行できるかを制御し、numConcurrentJobsは
1.はじめに
Spark Streaming(1)では、JobSchedulerがJobGeneratorを使用してDStream DAGに基づいて一定時間ごとにRDD DAGを作成し、jobをコミットすることができると述べており、ここでは主にJobSchedulerの詳細について説明する.
2. JobScheduler
JobSchedulerは、StreamingContextがstartを呼び出すと起動します.起動シーケンスは次のとおりです.
StreamingContext#start
->JobScheduler#start
-> ReceiverTracker#start
->JobGenerator#start
JobSchedulerには次のメンバーがいます.
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
3.JobGenerator生成job
JobSchedulerがJobGeneratorにjobの生成を依頼したことについて説明します.次はJobGeneratorのコアメンバーです.
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
private var eventLoop: EventLoop[JobGeneratorEvent] = null
// last batch whose completion,checkpointing and metadata cleanup has been completed
生成job timerタイマbatchDurationごとにGenerateJobイベント生成jobをeventLoopに送信します.次に、eventLoop時間のメインサイクルでGenerateJobイベント呼び出しを処理します.
eventLoop#processEvent
--> jobGenerator#generateJobs
次はJobGeneratorのgenerateJobs
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
// jobs JobScheudler,
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
DStreamGraph#generateJobs
->DStream#generateJob
//DStream#generateJob
private[streaming] def generateJob(time: Time): Option[Job] = {
// DStream RDD
getOrCompute(time) match {
case Some(rdd) =>
// , RDD job
// JobScheduler jobExecutor job ,
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
timeとjobFuncの関数を使用してJobを作成し、jobFuncはスケジューリング時に実行されます.
4.JobSchedulerスケジューリングjob
3ではJobGeneratorがjobsを生成し、生成したjobをJobSchedulerにフィードバックすることに言及し、2ではJobSchedulerがjobExecutorを使用してjobをスケジューリングすることについて述べた.
次に、JobSchedulerのsubmitJobSetメソッドを示します.
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
上記のコードの
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
は、JobGeneratorから渡された各jobをJobHandlerにパッケージし、jobExecutorスレッドプールで実行をスケジュールします.JobHandlerはRunnableインタフェースを実現し、そのrun方法は以下の通りです.
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from $batchLinkText""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
Job#runメソッドを呼び出し、runメソッドでjobFuncを実行してjobのコミットを完了します.
job並列度の制御JobSchedulerのメンバーnumConcurrentJobsは同時にどれだけのstream jobが実行できるかを制御し、numConcurrentJobsは
spark.streaming.concurrentJobs
の構成項目で取得し、デフォルトは1である.numCOncurrentJobsは、jobExecutorスレッドプール内のスレッドの数を制御し、同時に実行されるjobHandlerの数を制御する(一方、1つのjobHandlerは1つのjobをカプセル化する).