sparkバージョンカスタム20160502判例によってSparkStreamingを徹底的に理解する3

10662 ワード

本期の内容:
  • Spark Streaming Jobアーキテクチャを解読し、実行機構
  • Spark Streamingフォールトトレランスアーキテクチャと運転機構
  • を解読する。
    リアルタイムで処理できないデータはすべて無効なデータです。ストリーム処理時代には、SparkStreamingは強力な魅力を持っており、発展の見通しが広く、Sparkの生態系に加えて、StremingはSQL、MLlibなどの他の強力な枠組みを呼び出すことができます。
    Spark Streamingは、Spark Core上のフロー処理フレームというより、Spark Core上の最も複雑なアプリケーションです。Spark streamingという複雑なアプリケーションを身につけることができれば、他の複雑なアプリケーションは言うまでもありません。ここでSpark Streamingをバージョンとしてカスタマイズする切り込みポイントを選ぶのも大きな流れです。
        この授業はjobとフォールトトレランスの全体構造からSpark Streamingの運行メカニズムを考察します。
    前にすでにあった最も簡単な例を使います。
    // Socket       
    // YY  :  20:00      68917580
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    words.print()
    ssc.start()
    追跡ソースは発見できます。
    Streaming Contectを初期化する時、以下のいくつかのオブジェクトを作成しました。
    // StreamingContext.scala line 183
    private[streaming] val scheduler = new JobScheduler(this)
    JobSchedulerは初期化すると、jobGeneratorを初期化し、receiverTrackerを含む。
    // JobScheduler.scala line 50
    private val jobGenerator = new JobGenerator(this) // line 50
    val clock = jobGenerator.clock
    val listenerBus = new StreamingListenerBus()
    
    // These two are created only when scheduler starts.
    // eventLoop not being null means the scheduler has been started and not stopped
    var receiverTracker: ReceiverTracker = null // 56
    DStreamを作成する部分を見てください。
    // StreamingContext.scala line 327
    def socketTextStream(
        hostname: String,
        port: Int,
        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
      ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
      socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
    }
    
    // StreamingContext.scala line 345
    def socketStream[T: ClassTag](
        hostname: String,
        port: Int,
        converter: (InputStream) => Iterator[T],
        storageLevel: StorageLevel
      ): ReceiverInputDStream[T] = {
      new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351
    }
    // SocketInputDStream.scala line 33
    private[streaming]
    class SocketInputDStream[T: ClassTag](
        ssc_ : StreamingContext,
        host: String,
        port: Int,
        bytesToObjects: InputStream => Iterator[T],
        storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](ssc_) {
    
      //        
      def getReceiver(): Receiver[T] = {
        new SocketReceiver(host, port, bytesToObjects, storageLevel)
      }
    }
    またssc.startを見ます
    // StreamingContext.scala line 594
    def start(): Unit = synchronized {
      state match {
        case INITIALIZED =>
          startSite.set(DStream.getCreationSite())
          StreamingContext.ACTIVATION_LOCK.synchronized {
            StreamingContext.assertNoOtherContextIsActive()
            try {
              validate()
    
              // Start the streaming scheduler in a new thread, so that thread local properties
              // like call sites and job groups can be reset without affecting those of the
              // current thread.
              ThreadUtils.runInNewThread("streaming-start") {
                sparkContext.setCallSite(startSite.get)
                sparkContext.clearJobGroup()
                sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
                scheduler.start() // line 610
              }
              state = StreamingContextState.ACTIVE
            } catch {
              case NonFatal(e) =>
                logError("Error starting the context, marking it as stopped", e)
                scheduler.stop(false)
                state = StreamingContextState.STOPPED
                throw e
            }
            StreamingContext.setActiveContext(this)
          }
          shutdownHookRef = ShutdownHookManager.addShutdownHook(
            StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
          // Registering Streaming Metrics at the start of the StreamingContext
          assert(env.metricsSystem != null)
          env.metricsSystem.registerSource(streamingSource)
          uiTab.foreach(_.attach())
          logInfo("StreamingContext started")
        case ACTIVE =>
          logWarning("StreamingContext has already been started")
        case STOPPED =>
          throw new IllegalStateException("StreamingContext has already been stopped")
      }
    }
    610行目は、scheduler.startを呼び出し、schedulerは、初期化前に生成されたJobSchedulerである。
    // JobScheduler.scala line 62
    def start(): Unit = synchronized {
      if (eventLoop != null) return // scheduler has already been started
    
      logDebug("Starting JobScheduler")
      eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
        override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
      }
      eventLoop.start()
    
      // attach rate controllers of input streams to receive batch completion updates
      for {
        inputDStream <- ssc.graph.getInputStreams
        rateController <- inputDStream.rateController
      } ssc.addStreamingListener(rateController)
    
      listenerBus.start(ssc.sparkContext)
      receiverTracker = new ReceiverTracker(ssc) // line 80
      inputInfoTracker = new InputInfoTracker(ssc)
      receiverTracker.start()
      jobGenerator.start()
      logInfo("Started JobScheduler")
    }
    80行を見てください。receiverTrackerを初期化します。
    // ReceiverTracker.scala line 101
    private[streaming]
    class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
      private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
      private val receiverInputStreamIds = receiverInputStreams.map { _.id }
      private val receivedBlockTracker = new ReceivedBlockTracker(
        ssc.sparkContext.conf,
        ssc.sparkContext.hadoopConfiguration,
        receiverInputStreamIds,
        ssc.scheduler.clock,
        ssc.isCheckpointPresent,
        Option(ssc.checkpointDir)
      )
    receiver Tracker.startとjobGenerator.starを呼び出します。
    // ReceiverTracker.scala line 148
    /** Start the endpoint and receiver execution thread. */
    def start(): Unit = synchronized {
      if (isTrackerStarted) {
        throw new SparkException("ReceiverTracker already started")
      }
    
      if (!receiverInputStreams.isEmpty) {
        endpoint = ssc.env.rpcEnv.setupEndpoint(
          "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
        if (!skipReceiverLaunch) launchReceivers() // line 157
        logInfo("ReceiverTracker started")
        trackerState = Started
      }
    }
    launchReceivers()
    // ReceiverTracker.scala line 413
    private def launchReceivers(): Unit = {
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver() //     SocketInputDStream.getReceiver(),    SocketReceiver , SocketInputDStream.scala line 34
        rcvr.setReceiverId(nis.id)
        rcvr
      })
    
      runDummySparkJob()
    
      logInfo("Starting " + receivers.length + " receivers")
      endpoint.send(StartAllReceivers(receivers)) // line 423
    }
    StartAllReceiversはどのように消費されているか見てください。
    // ReceiverTracker.scala line 448
    // Local messages
    case StartAllReceivers(receivers) =>
      val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) //       
      for (receiver <- receivers) {
        val executors = scheduledLocations(receiver.streamId)
        updateReceiverScheduledExecutors(receiver.streamId, executors)
        receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
        startReceiver(receiver, executors) //      ,       ,            
      }
    JobScheduler.scala line 83に戻って、jobGenerator.start
    // JobGenerator.scala line 79
    def start(): Unit = synchronized {
      if (eventLoop != null) return // generator has already been started
    
      // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
      // See SPARK-10125
      checkpointWriter
    
      eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
        override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = {
          jobScheduler.reportError("Error in job generator", e)
        }
      }
      eventLoop.start()
    
      if (ssc.isCheckpointPresent) {
        restart()
      } else {
        startFirstTime()
      }
    }
    このメッセージ受信とJobジェネレータは起動されました。
    Streaming Contectでstartメソッドを呼び出す内部は、実はJobSchedulerのStart方法を起動し、メッセージサイクルを行い、JobSchedulerのstart内部にJobGeneratorとReceiverTackerを構築し、JobGeneratorとReceiverTackerのstart方法を呼び出します。
    1.JobGenerator起動後は常にbatDurationによってJobを生成します。
    2.Receiver Tracker起動後、まずSpark CusterでReceiverを起動し(実はExectorでReceiver Supervisorを起動します)、Receiverがデータを受け取ったら、Receiver Supervisorを通じてExectorに格納し、データのMetadata情報をDriverのReceiver Trackerに送信します。Receiver Tracker内部では、ReceivedBlockTrackerによって受信したメタデータ情報を管理します。
    各Batch Intervalは具体的なJobを生成します。ここのJobはSpark CoreのJobではありません。DStreamGraphhに基づいて生成されたRDDのDAGだけです。Javaの観点から言えば、Runnableインターフェースの例に相当します。Jobを実行するにはJobSchedulerに提出する必要があります。JobSchedulerにおいて、スレッド池を通じて、個々のスレッドを見つけて、Jobをクラスタに送って実行する(実は、スレッド内でRDDに基づくアクションが本格的なジョブの実行をトリガする)。
    なぜスレッド池を使いますか?
    1.作業は絶えず発生していますので、効率を上げるために、スレッド池が必要です。これはExectorでスレッド池を通じてTaskを実行するのとは異曲同業の妙である。
     2.JobのFAIR公平なスケジュールが設定されている可能性があります。この場合もマルチスレッドのサポートが必要です。
    第二部分:落ち着いて間違った構造の角度の透視Spark Streaming
    DStreamとRDDとの関係は、時間が経つにつれてRDDが発生し、DStreamの操作は、一定の時間でRDDが操作されることを知っています。だから、ある意味、Spark StreamingのDStreamベースのフォールト機構は、実際には毎回形成されるRDDのフォールト機構に分割されており、これもSpark Streamingの優れた点である。
    RDDDは分布弾性データセットとして、その弾性は主に以下の通りである。
    1.自動的にメモリとハードディスクを割り当て、優先的にメモリに基づく
    2.ラインアップによるフォールトトレランス機構
    3.taskは回数を指定して再試行します。
    4.ステージ失敗は自動的にやり直します。
    5.checkpointとpersist多重化
    6.データスケジュールの弾力性:DAG、TASKと資源管理は関係ありません。
    7.データスライスの高弾性
    RDDの特性に基づいて、そのフォールトメカニズムは主に2つです。一つはチェックポイント、もう一つはラインアップによるフォールトです。一般的に、sparkは血統のフォールトを選択します。大規模なデータセットに対して、チェックポイントのコストが高いからです。しかし、場合によっては、LINEリンクのチェーンが複雑すぎて、長いというよりは、チェックポイントが必要です。
    RDDDの依存関係を考慮して、各ステージ内部は狭い依存であり、この場合は、一般的にLINEのフォールトトレランスに基づいて、便利で効率的である。stageの間では、幅の依存性があり、shuffleの操作が発生しています。この場合は、チェックポイントの方がいいです。まとめて言うと、stageの内部でLINEをしたり、stageの間でcheck pointをしたりします。
    続きは何か深い裏話がありますか?次回の講釈を聞きなさい。
    王家林先生の知識を共有してくれてありがとうございます。
    王家林先生の名刺:
    中国Sparkの第一人者
    王家林先生の知識を共有してくれてありがとうございます。
    新浪微博:http://weibo.com/ilovepains
    微信公衆号:DTSpark
    ブログ:http://blog.sina.com.cn/ilovepains
    携帯電話:18610086859
    QQ:1740415547
    メールボックス:[email protected]
    YY教室:毎日20:00現場授業チャンネル68917580