ソースコードはexecutorがblockmanagerにデータを書く方法と、blockmanagerからデータを読む方法を追跡します.
16060 ワード
詳細
前にJobがどのようにsubmitと最後のrunを見て、blockmanagerがどのように働いているのかを見て、次にsparkがどのようにblockManagerからデータを読み書きしているのかを見ました.
まず各計算は対応する機器のexecutorのtask上で実行されているが,計算が完了した後もexecutor側から書き込みが開始されており,先の文書の解析によれば,最後にTaskがexecutorのTaskRunnerで実行されていることが分かるが,ここではデータ操作側で計算が完了した後にresultSizeがAkkaで伝送可能なsizeより大きいとblockに格納され,その後、Driver側のtaskschedulerによってexecutor側のblockmanagerから対応するblockの情報を取得し、executorのtaskrunnerではこのコードによってデータを格納します.
データが小さい場合はそのままDriverに戻ってしまい、データが大きい場合は
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
対応するblockの中に入れて、Driverが取りに来るのを待っています.ここではblockManagerのputBytesメソッド(このenvはslave側なのでblockmanagerもslave側なので)を直接呼び出し、putbytesを見てみましょう.
直接doPutメソッドを呼び出しましたが、このメソッドは長いので、具体的にはゆっくり見て、いくつかのポイントを選んで書いてください.
コードには様々なstoreを呼び出したputBytesメソッド(またはputIterator,putArray)が表示されます.
では、memorystoreを見てみましょう.putBytesの方法を見てみましょう.
MemoryOnlyを選択するとtryToPutメソッドを実行してデータを格納し、このメソッドを見てみましょう.
これまで、メモリが十分であればentriesにこのdataを入れていましたが、次にblockmanagerからデータを取得するには、例えばDriver側がexecutor側からデータを取り戻す必要があります.taskschedulerimplのstatusUpdateはDriver側からデータを取得する方法で、次のように呼び出されています.
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
この方法は実際にデータを取りに行きますので、見てみましょう.
見ました.IndirectTaskResultに戻ったら、blockIDに基づいてblockManagerに取りに行きます.
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
ここのblockManagerはDriver側のblockmanagerであるべきで、getRemoteBytesメソッドを見てみましょう.
直接doGetRemoteを呼び出します.
まず、次のようにします.
val locations = Random.shuffle(master.getLocations(blockId))
master側からblockIdのすべてのlocationを取得し、1つ1つのlocation側から取り戻し、取り戻す方法は:
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
このblockTransferServiceは、BlockManagerを作成するときに一緒に作成し、BlockManagerと一緒に初期化したものです.
ここではNIO方式でexecutor側からデータを取り戻しました.全体の結果を保存して結果を取る大まかな経路はこのようにして、MemoryOnlyを持って1つの列を作って、その他の方式はこの流れによって1回追跡してきっと分かります.
前にJobがどのようにsubmitと最後のrunを見て、blockmanagerがどのように働いているのかを見て、次にsparkがどのようにblockManagerからデータを読み書きしているのかを見ました.
まず各計算は対応する機器のexecutorのtask上で実行されているが,計算が完了した後もexecutor側から書き込みが開始されており,先の文書の解析によれば,最後にTaskがexecutorのTaskRunnerで実行されていることが分かるが,ここではデータ操作側で計算が完了した後にresultSizeがAkkaで伝送可能なsizeより大きいとblockに格納され,その後、Driver側のtaskschedulerによってexecutor側のblockmanagerから対応するblockの情報を取得し、executorのtaskrunnerではこのコードによってデータを格納します.
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
データが小さい場合はそのままDriverに戻ってしまい、データが大きい場合は
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
対応するblockの中に入れて、Driverが取りに来るのを待っています.ここではblockManagerのputBytesメソッド(このenvはslave側なのでblockmanagerもslave側なので)を直接呼び出し、putbytesを見てみましょう.
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
直接doPutメソッドを呼び出しましたが、このメソッドは長いので、具体的にはゆっくり見て、いくつかのポイントを選んで書いてください.
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
...
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
// BlockInfo ,
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
oldBlockOpt.get
} else {
tinfo
}
}
...
putBlockInfo.synchronized {
...
// , memorystore diskstore externalBlockStore
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
...
// , memory
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
...
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
...
}
...
}
コードには様々なstoreを呼び出したputBytesメソッド(またはputIterator,putArray)が表示されます.
では、memorystoreを見てみましょう.putBytesの方法を見てみましょう.
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
// memoryonly, false, StorageLevel :
//val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
} else {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
//
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
}
}
MemoryOnlyを選択するとtryToPutメソッドを実行してデータを格納し、このメソッドを見てみましょう.
private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
}
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
memoryManager.synchronized {
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// The synchronization on `memoryManager` here guarantees that the release and acquire
// happen atomically. This relies on the assumption that all memory acquisitions are
// synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
//
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory) {
// We acquired enough memory for the block, so go ahead and put it
// , , entries
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
// , , MemoryOnly deserialized false
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
deserialized
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
enoughMemory
}
}
これまで、メモリが十分であればentriesにこのdataを入れていましたが、次にblockmanagerからデータを取得するには、例えばDriver側がexecutor側からデータを取り戻す必要があります.taskschedulerimplのstatusUpdateはDriver側からデータを取得する方法で、次のように呼び出されています.
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
この方法は実際にデータを取りに行きますので、見てみましょう.
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}
見ました.IndirectTaskResultに戻ったら、blockIDに基づいてblockManagerに取りに行きます.
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
ここのblockManagerはDriver側のblockmanagerであるべきで、getRemoteBytesメソッドを見てみましょう.
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
直接doGetRemoteを呼び出します.
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
for (loc
numFetchFailures += 1
if (numFetchFailures == locations.size) {
// An exception is thrown while fetching this block from all locations
throw new BlockFetchException(s"Failed to fetch block from" +
s" ${locations.size} locations. Most recent failure cause:", e)
} else {
// This location failed, so we retry fetch from a different one by returning null here
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $numFetchFailures)", e)
null
}
}
if (data != null) {
if (asBlockResult) {
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
} else {
return Some(data)
}
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
まず、次のようにします.
val locations = Random.shuffle(master.getLocations(blockId))
master側からblockIdのすべてのlocationを取得し、1つ1つのlocation側から取り戻し、取り戻す方法は:
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
このblockTransferServiceは、BlockManagerを作成するときに一緒に作成し、BlockManagerと一緒に初期化したものです.
ここではNIO方式でexecutor側からデータを取り戻しました.全体の結果を保存して結果を取る大まかな経路はこのようにして、MemoryOnlyを持って1つの列を作って、その他の方式はこの流れによって1回追跡してきっと分かります.