SparkStreaming-データ受信BlockGeneratorソース分析

10311 ワード

データ受信ソース分析


 前回のブログでは、Receiverのデータ受信は主にBlockGeneratorによって受信され、保存されていると分析しています.次に、ソースコードについて、前のプロセスと照らし合わせて分析します. まず、BlockGeneratorを作成するときに初期化されるいくつかの重要なコンポーネントです.以下に示します.
  // blockInterval , 200ms, block 
  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

  //  200ms, updateCurrentBuffer
  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
  // blocksForPushing , 10
  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
  // blocksForPushing 
  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
  // blockPushingThread , , keepPushingBlocks() 
  //  , blocksForPushing block
  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

  //  currentBuffer, 
  @volatile private var currentBuffer = new ArrayBuffer[Any]

 上にはいくつかの比較的重要なパラメータが以下のように解釈されている: blockInterval: 200ms, block ;  blockIntervalTimer: , currentBuffer Block ;   blockQueueSize: block blocksForPushing: block blockPushingThread: , Block , BlockManager 。currentBuffer: , 。
BlockGeneratorのstart()メソッド
//  BlockGenerator, 
  //  blockIntervalTimer, currentBuffer , block
  //  blockPushingThread, blocksForPushing block, pushArrayBuffer() 
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }

 上記のコードから明らかなように、startメソッドではタイマとスレッドを起動するだけで、タイマを起動すると200 msおきにcurrentBufferのデータを取り出し、Blockを生成します.blockPushingThreadスレッドは、キュー内のデータをBlockManangerにプッシュします.タイマー起動時に呼び出される関数updateCurrentBufferを見てみましょう.
blockIntervalTimerのupdateCurrentBuffer
/** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          //  currentBuffer newBlockBuffer, currentBuffer
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          //  blockId, 
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          //  
          listener.onGenerateBlock(blockId)
          //  block
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      //  block blocksForPushing 
      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }

 上記のコードからsynchronizedキーワードを追加し、書き込みの同時問題を防止していることがわかります.まずcurrentBufferのデータをnewBlockBufferにコピーし、次にcurrentBufferを再作成します.これは、以前のデータと同様に空になります.次に、時間に基づいて一意のblockIdを生成し、Blockを作成し、作成したblockをblocksForPushingキューに追加します.次にblockPushingThreadスレッドの実行ロジックを見てみましょう.keepPushingBlocksが呼び出されます.
blockPushingThreadのkeepPushingBlocks
private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
	//  BlockGenerator 
    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      //  block , blocksForPushing block
      while (areBlocksBeingGenerated) {
        //  blocksForPushing ,poll block
        //  , 10ms 
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          //  block, pushBlock
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }


 上記のコードから分かるように、BlockGeneratorが停止しない限り、Blockが発生し続け、ここではblocksForPushingキューからBlockを取り出してプッシュし続けます.ここのblocksForPushingはブロックキューで、デフォルトのブロック時間は10 msです. blocksForPushingキューから取り出したblockはプッシュされ、プッシュはBlockGeneratorListenerのonPushBlockによってプッシュされ、onPushBlock()メソッドではpushArrayBufferが呼び出され、プッシュされたBlockが呼び出され、onPushBlock()メソッドではpushAndReportBlock()メソッドが最終的に呼び出され、このメソッドを分析する.
ReceiverSupervisorImplのpushAndReportBlockプッシュblock
def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    //  BlockId
    val blockId = blockIdOption.getOrElse(nextBlockId)
    //  
    val time = System.currentTimeMillis
    //  receivedBlockHandler, storeBlock , block BlockManager 
    //  
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    //  block 
    val numRecords = blockStoreResult.numRecords
    //  ReceivedBlockInfo , streamId   block store 
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    //  ReceiverTrackerEndPoint, ReceiverTracker AddBlock 
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
  }

 この方法は主に2つの機能を含み、1つはreceivedBlockHandlerのstoreBlockを呼び出してBlockをBlockManagerに保存する(またはプリライトログに書き込む)ことである.もう1つは,保存したBlock情報をReceivedBlockInfoとしてカプセル化し,ReceiverTrackerに送信することである.まず、最初の分析を行います.ブロックを格納するコンポーネントreceivedBlockHandlerは、プリライト・ログ機能をオンにするかどうかに応じて、次のように異なるreceivedBlockHandlerを作成します.
private val receivedBlockHandler: ReceivedBlockHandler = {
    //  , false(  spark.streaming.receiver.writeAheadLog.enable)
    //  true, ReceivedBlockHandler WriteAheadLogBasedBlockHandler,
    //  , BlockManagerBasedBlockHandler
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }

  它会判断是否开启了预写日志,通过读取 spark.streaming.receiver.writeAheadLog.enable 这个参数是否被设置为true.オンの場合はWriteAheadLogBasedBlockHandlerを作成し、そうでない場合はBlockManagerBasedBlockHandlerを作成します.次に、WriteAheadLogBasedBlockHandlerについて、storeBlockメソッドを分析します.
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
    var numRecords = None: Option[Long]
    //  Block 
    val serializedBlock = block match {
      case ArrayBufferBlock(arrayBuffer) =>
        numRecords = Some(arrayBuffer.size.toLong)
        blockManager.dataSerialize(blockId, arrayBuffer.iterator)
      case IteratorBlock(iterator) =>
        val countIterator = new CountingIterator(iterator)
        val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
        numRecords = countIterator.count
        serializedBlock
      case ByteBufferBlock(byteBuffer) =>
        byteBuffer
      case _ =>
        throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
    }

    //  BlockManager , ,  _SER   _2 
    //  , executor BlockManager , 
    val storeInBlockManagerFuture = Future {
      val putResult =
        blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
      if (!putResult.map { _._1 }.contains(blockId)) {
        throw new SparkException(
          s"Could not store $blockId to block manager with storage level $storageLevel")
      }
    }

    //  Block , Future 
    val storeInWriteAheadLogFuture = Future {
      writeAheadLog.write(serializedBlock, clock.getTimeMillis())
    }

    //  , , 
    val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
    val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
    WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
  }

 上記のコードから分かるように、主に2つのステップに分けられます.まず、Blockのデータをシーケンス化し、BlockManagerに格納します.ここで、 _SER _2 は、他のExecutorのBlockManagerにシーケンス化してコピーします. Worker executor BlockManager ;次にBlockのデータをプリライトログ(一般的にHDFSファイル)に書き込む. 上記から、書き込みログのフォールトトレランス対策は主に2つあります. Worker executor ( _SER _2); 。 , ( )。 次に、2つ目を分析し、ReceivedBlockInfo情報をReceiverTrackerに送信します.これは簡単に言えば、ReceiverTrackerはAddBlockのメッセージを受信した後、プリライトログを開くかどうかを判断し、プリライトログを開くとBlockの情報をプリライトログに書き込む必要があり、そうでなければキャッシュに保存する. 上記のデータ受信と記憶機能は、BlockGeneratorコンポーネントに基づいて受信したデータをキャッシュ、カプセル化、プッシュし、最終的にBlockManager(およびプリライトログ)にプッシュします.このうち、主に1つのタイマーblockIntervalTimerに依存し、200 msおきにcurrentBufferからすべてのデータを取り出し、1つのblockにカプセル化し、blocksForPushingキューに入れる.次にblockPushingThreadは、blocksForPushingキューからblockを取り出してプッシュし続け、これはブロックキューのブロック時間のデフォルトが10 msである.そして、BlockGeneratorListenerのonPushBlock()を介して(最終的にはpushArrayBufferが呼び出される)、BlockManagerにデータをプッシュし(書き込みログが開いていれば書き込みログにも書き込まれる)、AddBlockメッセージをReceiverTrackerに送信してBlockの登録を行う.