第3課:SparkStreaming徹底理解三板斧の三:SparkStreamingの運行メカニズムとアーキテクチャの進級のJobと許容誤差を解読する
7048 ワード
今期の内容: Spark Streaming Jobアーキテクチャと動作メカニズムの復号化 Spark Streamingフォールトトレランスアーキテクチャおよび動作メカニズムの復号化 SparkStreamingのJobの全体的なアーキテクチャと動作メカニズムを理解することはSparkStreamingに精通する上で極めて重要である.一般的なSparkアプリケーションではRDDのaction操作がJobの実行をトリガすることを知っている.では、SparkStreamingにとってJobはどのように動いているのでしょうか.SparkStreamingプログラムを作成するとき、BatchDurationを設定しました.JobはBatchDuration時間ごとに自動的にトリガーされます.この機能はSparkStreamingフレームワークがタイマーを提供しているに違いありません.時間が来ると、作成したプログラムをSparkに提出し、Spark jobで実行します.
この中には2つのJobの概念が含まれています. BatchIntervalごとに具体的なJobが生成されますが、ここのJobはSpark Coreで指すJobではありません.DStreamGraphに基づいて生成されたRDDのDAGにすぎません.Javaの観点から言えば、Runnableインタフェースのインスタンスに相当します.この場合、Jobを実行するにはJobSchedulerに提出する必要があります.JobSchedulerではスレッドプールを介して別のスレッドを見つけてJobをクラスタ実行にコミットします(実際にはスレッド内のRDDベースのActionが実際のジョブの実行をトリガします)、なぜスレッドプールを使用しますか?a)、ジョブは絶えず生成されるので、効率を高めるためにスレッドプールが必要です.これはExecutorでスレッドプールを介してTaskを実行するのと同じである.b)は,JobのFAIR公平スケジューリング方式が設定されている可能性があり,この場合もマルチスレッドのサポートが必要である. 上にJobが提出したSpark Jobそのもの.この時点から見ると、今回のジョブとSpark coreのジョブには何の違いもありません.
次にjobが実行するプロセスを見てみましょう.
1.まずSparkConfをインスタンス化し、運転期間パラメータを設定します.
StreamingContext.scalaの183行目
JobScheduler.start()の内部にReceiverTackerを構築し、JobGeneratorとReceiverTackerのstartメソッドを呼び出します.
この中には2つのJobの概念が含まれています.
次にjobが実行するプロセスを見てみましょう.
1.まずSparkConfをインスタンス化し、運転期間パラメータを設定します.
val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")
2.StreamingContextをインスタンス化し、batchDuration間隔を設定してJob生成の周波数を制御し、Spark Streaming実行のエントリを作成します.val ssc = new StreamingContext(conf,Seconds(20))
3.StreamingContextをインスタンス化する過程で、JobSchedulerとJobGeneratorをインスタンス化する.StreamingContext.scalaの183行目
private[streaming] val scheduler = new JobScheduler(this)
JobScheduler.scalaの50行目private val jobGenerator = new JobGenerator(this)
4.StreamingContextはstartメソッドを呼び出します.def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
5.StreamingContext.start()内でJobSchedulerのStartメソッドを起動します.scheduler.start()
JobScheduler.start()の内部でEventLoopをインスタンス化し、EventLoop.start()を実行してメッセージループを実行します.JobScheduler.start()の内部にReceiverTackerを構築し、JobGeneratorとReceiverTackerのstartメソッドを呼び出します.
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream
6.JobGenerator batchDuration Job
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
7.ReceiverTracker まずSpark ClusterでReceiverを ( はExecutorでReceiverSupervisorを に )し、Receiverでデータを するとReceiverSupervisorでExecutorに してDriverのReceiverTrackerにデータのMetadata を し、ReceiverTracker では、ReceivedBlockTrackerによって したメタデータ が されます./** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
.Spark Streamingフォールトトレランス :
DStreamとRDDの は が つにつれてRDDが えず し,DStreamに する は でRDDを することであることを っている.したがって、ある では、Spark StreamingのDStreamベースのフォールトトレランスメカニズムは、 には、 されるRDDごとに されるフォールトトレランスメカニズムであり、これもSpark Streamingの れた である.
Spark Streamingのフォールトトレランスは、 の2つの を します. Driver の にはCheckpointを し、Driver の を し、 にはCheckpointを み してDriver を することができる. には、ジョブの に するたびのリカバリは、Receiverの したリカバリを し、RDD に したリカバリも しなければならない.Receiverはwalログを く を することができます.RDDのフォールトトレランスはspark coreが まれつき したもので,RDDの に づいて,そのフォールトトレランス は に2つである: 01.checkpointに づく;
ステージ では、 でshuffle が し、lineageチェーンが で である 、checkpointが になります.
02.lineage( )ベースのフォールトトレランス:
に、sparkは なデータセットに して ポイントを うコストが いため、 を します.RDDの を すると、 ステージ は い であり、この 、lineageフォールトトレランスに づいており、 で である.
まとめ:stage はlineage、stage はcheckpoint.
コメント:
1、DTビッグデータドリーム ウィーチャット DT_Spark 2、IMF 8 ビッグデータ YY チャンネル :689175803、 : http://www.weibo.com/ilovepains