SparkStreaming-データ受信BlockGeneratorソース分析
データ受信ソース分析
前回のブログでは、Receiverのデータ受信は主にBlockGeneratorによって受信され、保存されていると分析しています.次に、ソースコードについて、前のプロセスと照らし合わせて分析します. まず、BlockGeneratorを作成するときに初期化されるいくつかの重要なコンポーネントです.以下に示します.
// blockInterval , 200ms, block
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
// 200ms, updateCurrentBuffer
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
// blocksForPushing , 10
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
// blocksForPushing
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
// blockPushingThread , , keepPushingBlocks()
// , blocksForPushing block
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
// currentBuffer,
@volatile private var currentBuffer = new ArrayBuffer[Any]
上にはいくつかの比較的重要なパラメータが以下のように解釈されている:
blockInterval: 200ms, block
; blockIntervalTimer: , currentBuffer Block
; blockQueueSize: block
blocksForPushing: block
blockPushingThread: , Block , BlockManager 。
currentBuffer: , 。
BlockGeneratorのstart()メソッド
// BlockGenerator,
// blockIntervalTimer, currentBuffer , block
// blockPushingThread, blocksForPushing block, pushArrayBuffer()
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
上記のコードから明らかなように、startメソッドではタイマとスレッドを起動するだけで、タイマを起動すると200 msおきにcurrentBufferのデータを取り出し、Blockを生成します.blockPushingThreadスレッドは、キュー内のデータをBlockManangerにプッシュします.タイマー起動時に呼び出される関数updateCurrentBufferを見てみましょう.
blockIntervalTimerのupdateCurrentBuffer
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
// currentBuffer newBlockBuffer, currentBuffer
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
// blockId,
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
//
listener.onGenerateBlock(blockId)
// block
newBlock = new Block(blockId, newBlockBuffer)
}
}
// block blocksForPushing
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
上記のコードからsynchronizedキーワードを追加し、書き込みの同時問題を防止していることがわかります.まずcurrentBufferのデータをnewBlockBufferにコピーし、次にcurrentBufferを再作成します.これは、以前のデータと同様に空になります.次に、時間に基づいて一意のblockIdを生成し、Blockを作成し、作成したblockをblocksForPushingキューに追加します.次にblockPushingThreadスレッドの実行ロジックを見てみましょう.keepPushingBlocksが呼び出されます.
blockPushingThreadのkeepPushingBlocks
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
// BlockGenerator
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// block , blocksForPushing block
while (areBlocksBeingGenerated) {
// blocksForPushing ,poll block
// , 10ms
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
// block, pushBlock
case Some(block) => pushBlock(block)
case None =>
}
}
// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case e: Exception =>
reportError("Error in block pushing thread", e)
}
}
上記のコードから分かるように、BlockGeneratorが停止しない限り、Blockが発生し続け、ここではblocksForPushingキューからBlockを取り出してプッシュし続けます.ここのblocksForPushingはブロックキューで、デフォルトのブロック時間は10 msです. blocksForPushingキューから取り出したblockはプッシュされ、プッシュはBlockGeneratorListenerのonPushBlockによってプッシュされ、onPushBlock()メソッドではpushArrayBufferが呼び出され、プッシュされたBlockが呼び出され、onPushBlock()メソッドではpushAndReportBlock()メソッドが最終的に呼び出され、このメソッドを分析する.
ReceiverSupervisorImplのpushAndReportBlockプッシュblock
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
// BlockId
val blockId = blockIdOption.getOrElse(nextBlockId)
//
val time = System.currentTimeMillis
// receivedBlockHandler, storeBlock , block BlockManager
//
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
// block
val numRecords = blockStoreResult.numRecords
// ReceivedBlockInfo , streamId block store
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
// ReceiverTrackerEndPoint, ReceiverTracker AddBlock
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
この方法は主に2つの機能を含み、1つはreceivedBlockHandlerのstoreBlockを呼び出してBlockをBlockManagerに保存する(またはプリライトログに書き込む)ことである.もう1つは,保存したBlock情報をReceivedBlockInfoとしてカプセル化し,ReceiverTrackerに送信することである.まず、最初の分析を行います.ブロックを格納するコンポーネントreceivedBlockHandlerは、プリライト・ログ機能をオンにするかどうかに応じて、次のように異なるreceivedBlockHandlerを作成します.
private val receivedBlockHandler: ReceivedBlockHandler = {
// , false( spark.streaming.receiver.writeAheadLog.enable)
// true, ReceivedBlockHandler WriteAheadLogBasedBlockHandler,
// , BlockManagerBasedBlockHandler
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
它会判断是否开启了预写日志,通过读取
spark.streaming.receiver.writeAheadLog.enable
这个参数是否被设置为true.オンの場合はWriteAheadLogBasedBlockHandlerを作成し、そうでない場合はBlockManagerBasedBlockHandlerを作成します.次に、WriteAheadLogBasedBlockHandlerについて、storeBlockメソッドを分析します.def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
// Block
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}
// BlockManager , , _SER _2
// , executor BlockManager ,
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
}
// Block , Future
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
// , ,
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
}
上記のコードから分かるように、主に2つのステップに分けられます.まず、Blockのデータをシーケンス化し、BlockManagerに格納します.ここで、
_SER _2
は、他のExecutorのBlockManagerにシーケンス化してコピーします. Worker executor BlockManager
;次にBlockのデータをプリライトログ(一般的にHDFSファイル)に書き込む. 上記から、書き込みログのフォールトトレランス対策は主に2つあります. Worker executor ( _SER _2); 。 , ( )。
次に、2つ目を分析し、ReceivedBlockInfo情報をReceiverTrackerに送信します.これは簡単に言えば、ReceiverTrackerはAddBlockのメッセージを受信した後、プリライトログを開くかどうかを判断し、プリライトログを開くとBlockの情報をプリライトログに書き込む必要があり、そうでなければキャッシュに保存する. 上記のデータ受信と記憶機能は、BlockGeneratorコンポーネントに基づいて受信したデータをキャッシュ、カプセル化、プッシュし、最終的にBlockManager(およびプリライトログ)にプッシュします.このうち、主に1つのタイマーblockIntervalTimerに依存し、200 msおきにcurrentBufferからすべてのデータを取り出し、1つのblockにカプセル化し、blocksForPushingキューに入れる.次にblockPushingThreadは、blocksForPushingキューからblockを取り出してプッシュし続け、これはブロックキューのブロック時間のデフォルトが10 msである.そして、BlockGeneratorListenerのonPushBlock()を介して(最終的にはpushArrayBufferが呼び出される)、BlockManagerにデータをプッシュし(書き込みログが開いていれば書き込みログにも書き込まれる)、AddBlockメッセージをReceiverTrackerに送信してBlockの登録を行う.