ソースコードはexecutorがblockmanagerにデータを書く方法と、blockmanagerからデータを読む方法を追跡します.

16060 ワード

詳細
前にJobがどのようにsubmitと最後のrunを見て、blockmanagerがどのように働いているのかを見て、次にsparkがどのようにblockManagerからデータを読み書きしているのかを見ました. 
まず各計算は対応する機器のexecutorのtask上で実行されているが,計算が完了した後もexecutor側から書き込みが開始されており,先の文書の解析によれば,最後にTaskがexecutorのTaskRunnerで実行されていることが分かるが,ここではデータ操作側で計算が完了した後にresultSizeがAkkaで伝送可能なsizeより大きいとblockに格納され,その後、Driver側のtaskschedulerによってexecutor側のblockmanagerから対応するblockの情報を取得し、executorのtaskrunnerではこのコードによってデータを格納します.

val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

データが小さい場合はそのままDriverに戻ってしまい、データが大きい場合は
env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
対応するblockの中に入れて、Driverが取りに来るのを待っています.ここではblockManagerのputBytesメソッド(このenvはslave側なのでblockmanagerもslave側なので)を直接呼び出し、putbytesを見てみましょう.

  def putBytes(
      blockId: BlockId,
      bytes: ByteBuffer,
      level: StorageLevel,
      tellMaster: Boolean = true,
      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
    require(bytes != null, "Bytes is null")
    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
  }

直接doPutメソッドを呼び出しましたが、このメソッドは長いので、具体的にはゆっくり見て、いくつかのポイントを選んで書いてください.

private def doPut(
      blockId: BlockId,
      data: BlockValues,
      level: StorageLevel,
      tellMaster: Boolean = true,
      effectiveStorageLevel: Option[StorageLevel] = None)
    : Seq[(BlockId, BlockStatus)] = { 
	
	...
	
	 val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
	 
	 
	 //  BlockInfo         ,       
	val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster)
      // Do atomically !
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
      if (oldBlockOpt.isDefined) {
        if (oldBlockOpt.get.waitForReady()) {
          logWarning(s"Block $blockId already exists on this machine; not re-adding it")
          return updatedBlocks
        }
        // TODO: So the block info exists - but previous attempt to load it (?) failed.
        // What do we do now ? Retry on it ?
        oldBlockOpt.get
      } else {
        tinfo
      }
    }
	
	...
	
	putBlockInfo.synchronized {
	
	...
	//    ,      memorystore  diskstore  externalBlockStore
	        val (returnValues, blockStore: BlockStore) = {
          if (putLevel.useMemory) {
            // Put it in memory first, even if it also has useDisk set to true;
            // We will drop it to disk later if the memory store can't hold it.
            (true, memoryStore)
          } else if (putLevel.useOffHeap) {
            // Use external block store
            (false, externalBlockStore)
          } else if (putLevel.useDisk) {
            // Don't get back the bytes from put unless we replicate them
            (putLevel.replication > 1, diskStore)
          } else {
            assert(putLevel == StorageLevel.NONE)
            throw new BlockException(
              blockId, s"Attempted to put block $blockId without specifying storage level!")
          }
        }
	
	...
	
	//        ,      memory        
	        val result = data match {
          case IteratorValues(iterator) =>
            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
          case ArrayValues(array) =>
            blockStore.putArray(blockId, array, putLevel, returnValues)
          case ByteBufferValues(bytes) =>
            bytes.rewind()
            blockStore.putBytes(blockId, bytes, putLevel)
	
	...
	
	        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
          // Now that the block is in either the memory, externalBlockStore, or disk store,
          // let other threads read it, and tell the master about it.
          marked = true
          putBlockInfo.markReady(size)
          if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
	
	
	...
	
	}
	
	...
	
	}

コードには様々なstoreを呼び出したputBytesメソッド(またはputIterator,putArray)が表示されます.
では、memorystoreを見てみましょう.putBytesの方法を見てみましょう.

   override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    val bytes = _bytes.duplicate()
    bytes.rewind()
	
	//    memoryonly,    false,   StorageLevel  :
	//val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    if (level.deserialized) {
      val values = blockManager.dataDeserialize(blockId, bytes)
	  
	      
      putIterator(blockId, values, level, returnValues = true)
    } else {
      val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
	  
	  //    
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    }
  }


MemoryOnlyを選択するとtryToPutメソッドを実行してデータを格納し、このメソッドを見てみましょう.

 private def tryToPut(
      blockId: BlockId,
      value: Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
    tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
  }

private def tryToPut(
      blockId: BlockId,
      value: () => Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {

    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
     * been released, it must be ensured that those to-be-dropped blocks are not double counted
     * for freeing up more space for another block that needs to be put. Only then the actually
     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

    memoryManager.synchronized {
      // Note: if we have previously unrolled this block successfully, then pending unroll
      // memory should be non-zero. This is the amount that we already reserved during the
      // unrolling process. In this case, we can just reuse this space to cache our block.
      // The synchronization on `memoryManager` here guarantees that the release and acquire
      // happen atomically. This relies on the assumption that all memory acquisitions are
      // synchronized on the same lock.
      releasePendingUnrollMemoryForThisTask()
	  
	  //         
      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
      if (enoughMemory) {
        // We acquired enough memory for the block, so go ahead and put it
		//       ,          ,       entries          
		
        val entry = new MemoryEntry(value(), size, deserialized)
        entries.synchronized {
          entries.put(blockId, entry)
        }
        val valuesOrBytes = if (deserialized) "values" else "bytes"
        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
      } else {
        // Tell the block manager that we couldn't put it in memory so that it can drop it to
        // disk if the block allows disk storage.
		
		//        ,                   ,     MemoryOnly  deserialized false
		
        lazy val data = if (deserialized) {
          Left(value().asInstanceOf[Array[Any]])
        } else {
          deserialized 
        }
        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
      }
      enoughMemory
    }
  }


これまで、メモリが十分であればentriesにこのdataを入れていましたが、次にblockmanagerからデータを取得するには、例えばDriver側がexecutor側からデータを取り戻す必要があります.taskschedulerimplのstatusUpdateはDriver側からデータを取得する方法で、次のように呼び出されています.
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
この方法は実際にデータを取りに行きますので、見てみましょう.

def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            case directResult: DirectTaskResult[_] =>
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              }
              // deserialize "value" without holding any lock so that it won't block other threads.
              // We should call it here, so that when it's called again in
              // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
              directResult.value()
              (directResult, serializedData.limit())
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }
              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                /* We won't be able to get the task result if the machine that ran the task failed
                 * between when the task ended and when we tried to fetch the result, or if the
                 * block manager had to flush the result. */
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)
          }

          result.metrics.setResultSize(size)
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
          case NonFatal(ex) =>
            logError("Exception while getting task result", ex)
            taskSetManager.abort("Exception while getting task result: %s".format(ex))
        }
      }
    })
  }

見ました.IndirectTaskResultに戻ったら、blockIDに基づいてblockManagerに取りに行きます.
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
ここのblockManagerはDriver側のblockmanagerであるべきで、getRemoteBytesメソッドを見てみましょう.

  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
    logDebug(s"Getting remote block $blockId as bytes")
    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
  }

直接doGetRemoteを呼び出します.

  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    val locations = Random.shuffle(master.getLocations(blockId))
    var numFetchFailures = 0
    for (loc 
          numFetchFailures += 1
          if (numFetchFailures == locations.size) {
            // An exception is thrown while fetching this block from all locations
            throw new BlockFetchException(s"Failed to fetch block from" +
              s" ${locations.size} locations. Most recent failure cause:", e)
          } else {
            // This location failed, so we retry fetch from a different one by returning null here
            logWarning(s"Failed to fetch remote block $blockId " +
              s"from $loc (failed attempt $numFetchFailures)", e)
            null
          }
      }

      if (data != null) {
        if (asBlockResult) {
          return Some(new BlockResult(
            dataDeserialize(blockId, data),
            DataReadMethod.Network,
            data.limit()))
        } else {
          return Some(data)
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

まず、次のようにします.
val locations = Random.shuffle(master.getLocations(blockId))
master側からblockIdのすべてのlocationを取得し、1つ1つのlocation側から取り戻し、取り戻す方法は:
blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
このblockTransferServiceは、BlockManagerを作成するときに一緒に作成し、BlockManagerと一緒に初期化したものです.
ここではNIO方式でexecutor側からデータを取り戻しました.全体の結果を保存して結果を取る大まかな経路はこのようにして、MemoryOnlyを持って1つの列を作って、その他の方式はこの流れによって1回追跡してきっと分かります.