第3課:SparkStreaming徹底理解三板斧の三:SparkStreamingの運行メカニズムとアーキテクチャの進級のJobと許容誤差を解読する


今期の内容:
  • Spark Streaming Jobアーキテクチャと動作メカニズムの復号化
  • Spark Streamingフォールトトレランスアーキテクチャおよび動作メカニズムの復号化
  • SparkStreamingのJobの全体的なアーキテクチャと動作メカニズムを理解することはSparkStreamingに精通する上で極めて重要である.一般的なSparkアプリケーションではRDDのaction操作がJobの実行をトリガすることを知っている.では、SparkStreamingにとってJobはどのように動いているのでしょうか.SparkStreamingプログラムを作成するとき、BatchDurationを設定しました.JobはBatchDuration時間ごとに自動的にトリガーされます.この機能はSparkStreamingフレームワークがタイマーを提供しているに違いありません.時間が来ると、作成したプログラムをSparkに提出し、Spark jobで実行します.
    この中には2つのJobの概念が含まれています.
  • BatchIntervalごとに具体的なJobが生成されますが、ここのJobはSpark Coreで指すJobではありません.DStreamGraphに基づいて生成されたRDDのDAGにすぎません.Javaの観点から言えば、Runnableインタフェースのインスタンスに相当します.この場合、Jobを実行するにはJobSchedulerに提出する必要があります.JobSchedulerではスレッドプールを介して別のスレッドを見つけてJobをクラスタ実行にコミットします(実際にはスレッド内のRDDベースのActionが実際のジョブの実行をトリガします)、なぜスレッドプールを使用しますか?a)、ジョブは絶えず生成されるので、効率を高めるためにスレッドプールが必要です.これはExecutorでスレッドプールを介してTaskを実行するのと同じである.b)は,JobのFAIR公平スケジューリング方式が設定されている可能性があり,この場合もマルチスレッドのサポートが必要である.
  • 上にJobが提出したSpark Jobそのもの.この時点から見ると、今回のジョブとSpark coreのジョブには何の違いもありません.

  • 次にjobが実行するプロセスを見てみましょう.
    1.まずSparkConfをインスタンス化し、運転期間パラメータを設定します.
    val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")
    2.StreamingContextをインスタンス化し、batchDuration間隔を設定してJob生成の周波数を制御し、Spark Streaming実行のエントリを作成します.
    val ssc = new StreamingContext(conf,Seconds(20))
    3.StreamingContextをインスタンス化する過程で、JobSchedulerとJobGeneratorをインスタンス化する.
    StreamingContext.scalaの183行目
    private[streaming] val scheduler = new JobScheduler(this)
    JobScheduler.scalaの50行目
    private val jobGenerator = new JobGenerator(this)
    4.StreamingContextはstartメソッドを呼び出します.
    def start(): Unit = synchronized {
      state match {
        case INITIALIZED =>
          startSite.set(DStream.getCreationSite())
          StreamingContext.ACTIVATION_LOCK.synchronized {
            StreamingContext.assertNoOtherContextIsActive()
            try {
              validate()
    
              // Start the streaming scheduler in a new thread, so that thread local properties
              // like call sites and job groups can be reset without affecting those of the
              // current thread.
              ThreadUtils.runInNewThread("streaming-start") {
                sparkContext.setCallSite(startSite.get)
                sparkContext.clearJobGroup()
                sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
                scheduler.start()
              }
              state = StreamingContextState.ACTIVE
            } catch {
              case NonFatal(e) =>
                logError("Error starting the context, marking it as stopped", e)
                scheduler.stop(false)
                state = StreamingContextState.STOPPED
                throw e
            }
            StreamingContext.setActiveContext(this)
          }
          shutdownHookRef = ShutdownHookManager.addShutdownHook(
            StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
          // Registering Streaming Metrics at the start of the StreamingContext
          assert(env.metricsSystem != null)
          env.metricsSystem.registerSource(streamingSource)
          uiTab.foreach(_.attach())
          logInfo("StreamingContext started")
        case ACTIVE =>
          logWarning("StreamingContext has already been started")
        case STOPPED =>
          throw new IllegalStateException("StreamingContext has already been stopped")
      }
    }
    5.StreamingContext.start()内でJobSchedulerのStartメソッドを起動します.
    scheduler.start()
    JobScheduler.start()の内部でEventLoopをインスタンス化し、EventLoop.start()を実行してメッセージループを実行します.
    JobScheduler.start()の内部にReceiverTackerを構築し、JobGeneratorとReceiverTackerのstartメソッドを呼び出します.
    def start(): Unit = synchronized {
      if (eventLoop != null) return // scheduler has already been started
    
      logDebug("Starting JobScheduler")
      eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
        override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
      }
      eventLoop.start()
    
      // attach rate controllers of input streams to receive batch completion updates
      for {
        inputDStream  
      

    6.JobGenerator batchDuration Job

    /** Generate jobs and perform checkpoint for the given `time`.  */
    private def generateJobs(time: Time) {
      // Set the SparkEnv in this thread, so that job generation code can access the environment
      // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
      // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
      SparkEnv.set(ssc.env)
      Try {
        jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
        graph.generateJobs(time) // generate jobs using allocated block
      } match {
        case Success(jobs) =>
          val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
          jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
        case Failure(e) =>
          jobScheduler.reportError("Error generating jobs for time " + time, e)
      }
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    }
    7.ReceiverTracker まずSpark ClusterでReceiverを ( はExecutorでReceiverSupervisorを に )し、Receiverでデータを するとReceiverSupervisorでExecutorに してDriverのReceiverTrackerにデータのMetadata を し、ReceiverTracker では、ReceivedBlockTrackerによって したメタデータ が されます.
    /** Start the endpoint and receiver execution thread. */
    def start(): Unit = synchronized {
      if (isTrackerStarted) {
        throw new SparkException("ReceiverTracker already started")
      }
    
      if (!receiverInputStreams.isEmpty) {
        endpoint = ssc.env.rpcEnv.setupEndpoint(
          "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
        if (!skipReceiverLaunch) launchReceivers()
        logInfo("ReceiverTracker started")
        trackerState = Started
      }
    }
    .Spark Streamingフォールトトレランス :
    DStreamとRDDの は が つにつれてRDDが えず し,DStreamに する は でRDDを することであることを っている.したがって、ある では、Spark StreamingのDStreamベースのフォールトトレランスメカニズムは、 には、 されるRDDごとに されるフォールトトレランスメカニズムであり、これもSpark Streamingの れた である.
    Spark Streamingのフォールトトレランスは、 の2つの を します.
  • Driver の にはCheckpointを し、Driver の を し、 にはCheckpointを み してDriver を することができる.
  • には、ジョブの に するたびのリカバリは、Receiverの したリカバリを し、RDD に したリカバリも しなければならない.Receiverはwalログを く を することができます.RDDのフォールトトレランスはspark coreが まれつき したもので,RDDの に づいて,そのフォールトトレランス は に2つである:
  • 01.checkpointに づく;
         ステージ では、 でshuffle が し、lineageチェーンが で である 、checkpointが になります.
    02.lineage( )ベースのフォールトトレランス:
    に、sparkは なデータセットに して ポイントを うコストが いため、 を します.RDDの を すると、 ステージ は い であり、この 、lineageフォールトトレランスに づいており、 で である.
    まとめ:stage はlineage、stage はcheckpoint.
    コメント:
    1、DTビッグデータドリーム ウィーチャット DT_Spark 2、IMF 8 ビッグデータ YY チャンネル :689175803、 : http://www.weibo.com/ilovepains