Sparkソース分析–BlockManagerMaster&Slave

21733 ワード


BlockManagerMaster


BlockManagerMasterActorの一連のインタフェースを維持するだけで、すべてtellとaskDriverWithReplyを通じてBlockManagerMasterActorからデータを取得して鶏の肋骨を比較するクラスです.
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
  /** Remove a dead executor from the driver actor. This is only called on the driver side. */
  def removeExecutor(execId: String) 
  /**
   * Send the driver actor a heart beat from the slave. Returns true if everything works out,
   * false if the driver does not know about the given block manager, which means the block
   * manager should re-register.
   */
  def sendHeartBeat(blockManagerId: BlockManagerId): Boolean 
  /** Register the BlockManager's id with the driver. */
  def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef)
  def updateBlockInfo(
      blockManagerId: BlockManagerId,
      blockId: String,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long): Boolean 
  /** Get locations of the blockId from the driver */
  def getLocations(blockId: String): Seq[BlockManagerId] 
  /** Get locations of multiple blockIds from the driver */
  def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] 
  /** Get ids of other nodes in the cluster from the driver */
  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId]
  /**
   * Remove a block from the slaves that have it. This can only be used to remove
   * blocks that the driver knows about.
   */
  def removeBlock(blockId: String) 
  /**
   * Remove all blocks belonging to the given RDD.
   */
  def removeRdd(rddId: Int, blocking: Boolean) 
  /**
   * Return the memory status for each block manager, in the form of a map from
   * the block manager's id to two long values. The first value is the maximum
   * amount of memory allocated for the block manager, while the second is the
   * amount of remaining memory.
   */
  def getMemoryStatus: Map[BlockManagerId, (Long, Long)] 
  def getStorageStatus: Array[StorageStatus] 
  /** Stop the driver actor, called only on the Spark driver node */
  def stop() {
    if (driverActor != null) {
      tell(StopBlockManagerMaster)
      driverActor = null
      logInfo("BlockManagerMaster stopped")
    }
  }

  /** Send a one-way message to the master actor, to which we expect it to reply with true. */
  private def tell(message: Any) {
    if (!askDriverWithReply[Boolean](message)) {
      throw new SparkException("BlockManagerMasterActor returned false, expected true.")
    }
  }

  /**
   * Send a message to the driver actor and get its result within a default timeout, or
   * throw a SparkException if this fails.
   */
  private def askDriverWithReply[T](message: Any): T = {
    // TODO: Consider removing multiple attempts
    if (driverActor == null) {
      throw new SparkException("Error sending message to BlockManager as driverActor is null" +
        "[message =" + message + "]")
    }
    var attempts = 0
    var lastException: Exception = null
    while (attempts < AKKA_RETRY_ATTEMPTS) {
      attempts += 1
      try {
        val future = driverActor.ask(message)(timeout)
        val result = Await.result(future, timeout)
        if (result == null) {
          throw new SparkException("BlockManagerMaster returned null")
        }
        return result.asInstanceOf[T]
      } catch {
        case ie: InterruptedException => throw ie
        case e: Exception =>
          lastException = e
          logWarning("Error sending message to BlockManagerMaster in" + attempts + " attempts", e)
      }
      Thread.sleep(AKKA_RETRY_INTERVAL_MS)
    }
    throw new SparkException(
      "Error sending message to BlockManagerMaster [message =" + message + "]", lastException)
  }
}

 

BlockManagerInfo


BlockManagerMasterActor objectでは主にBlockManagerInfoを定義し、主にBlockManagerの下のすべてのblockを管理するためのBlockStatusとhbを定義し、更新と削除
なぜこの場所を定義するのですか?
private[spark]
object BlockManagerMasterActor {
  case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)

  class BlockManagerInfo(
      val blockManagerId: BlockManagerId,
      timeMs: Long,
      val maxMem: Long,
      val slaveActor: ActorRef)
    extends Logging {
    private var _remainingMem: Long = maxMem  //BlockManager memory 
    private var _lastSeenMs: Long = timeMs    //BlockManager heartbeat,         
    // Mapping from block id to its status.
    private val _blocks = new JHashMap[String, BlockStatus] // buffer block BlockStatus
    
    //  memSize,  0,  droppedMemorySize
    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
      if (_blocks.containsKey(blockId)) {
        // The block exists on the slave already.
        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
        if (originalLevel.useMemory) {
          _remainingMem += memSize
        }
      }

      if (storageLevel.isValid) {// isValid means it is either stored in-memory or on-disk.
        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
        if (storageLevel.useMemory) {
          _remainingMem -= memSize
        }
      } else if (_blocks.containsKey(blockId)) {
        // If isValid is not true, drop the block.
        val blockStatus: BlockStatus = _blocks.get(blockId)
        _blocks.remove(blockId)
        if (blockStatus.storageLevel.useMemory) {
          _remainingMem += blockStatus.memSize
        }
      }
    }

    def removeBlock(blockId: String) {
      if (_blocks.containsKey(blockId)) {
        _remainingMem += _blocks.get(blockId).memSize
        _blocks.remove(blockId)
      }
    }
  }
}

 

BlockManagerMasterActor


各slaveのBlockManagerInfo情報、および各blockのlocations情報(どのBlockManagerに属するか)を維持するコア機能は、これらのメタデータの管理と更新です.RegisterBlockManager updateBlockInfo heartBeat RemoveRDD、Executor(BlockManager)、Block
/**
 * BlockManagerMasterActor is an actor on the master node to track statuses of
 * all slaves' block managers.
 */
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
  // Mapping from block manager id to the block manager's information.
  // Buffer BlockManager Info 
  private val blockManagerInfo =
    new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

  // Mapping from block id to the set of block managers that have the block.
  // Buffer blockLocation, BlockManagerId location, BlockManagerId executor
  private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
  def receive = {
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
      register(blockManagerId, maxMemSize, slaveActor)
      sender ! true // BlockManagerMaster.tell true
    // …… BlockManagerMaster ,  
  }

//RegisterBlockManager eventの処理slaveからmasterに自分のblockmanagerを登録する//主にslaveのBlockManagerInfoをmasterに登録する
  private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
    if (id.executorId == "<driver>" && !isLocal) { //  driver, 
      // Got a register message from the master node; don't register it
    } else if (!blockManagerInfo.contains(id)) { //  , 
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(manager) => //  executor bm,  executor bm …… 
          // A block manager of the same executor already exists.
          // This should never happen. Let's just quit.
          logError("Got two different block manager registrations on " + id.executorId)
          System.exit(1)
        case None =>
          blockManagerIdByExecutor(id.executorId) = id
      }
      blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( //  BlockManagerInfo,  buffer blockManagerInfo 
        id, System.currentTimeMillis(), maxMemSize, slaveActor)
    }
  }

 
//updateBlockInfoの処理
  private def updateBlockInfo(
      blockManagerId: BlockManagerId,
      blockId: String,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long) {

    if (!blockManagerInfo.contains(blockManagerId)) { //blockManagerInfo blockManagerId 
      if (blockManagerId.executorId == "<driver>" && !isLocal) {
        // We intentionally do not register the master (except in local mode),
        // so we should not indicate failure.
        sender ! true
      } else {
        sender ! false
      }
      return
    }
    // BlockManagerInfo.updateBlockInfo
    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
    var locations: mutable.HashSet[BlockManagerId] = null
    if (blockLocations.containsKey(blockId)) {
      locations = blockLocations.get(blockId)
    } else {
      locations = new mutable.HashSet[BlockManagerId]
      blockLocations.put(blockId, locations) // block location 
    }

    if (storageLevel.isValid) {
      locations.add(blockManagerId)
    } else {
      locations.remove(blockManagerId)
    }

    // Remove the block from master tracking if it has been removed on all slaves.
    if (locations.size == 0) {
      blockLocations.remove(blockId)
    }
    sender ! true
  }

//removeRddの処理、RDDの削除
  private def removeRdd(rddId: Int): Future[Seq[Int]] = {
    // First remove the metadata for the given RDD, and then asynchronously remove the blocks from the slaves.
    val prefix = "rdd_" + rddId + "_"
    // Find all blocks for the given RDD, remove the block from both blockLocations and
    // the blockManagerInfo that is tracking the blocks.
    val blocks = blockLocations.keySet().filter(_.startsWith(prefix)) //  blockLocations RDD blocks
    blocks.foreach { blockId =>  //  blockManagerInfo blockLocations blocks 
      val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
      bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
      blockLocations.remove(blockId)
    }
    // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
    // The dispatcher is used as an implicit argument into the Future sequence construction.
    import context.dispatcher
    val removeMsg = RemoveRdd(rddId)
    Future.sequence(blockManagerInfo.values.map { bm =>   // Future.sequence, Transforms a Traversable[Future[A]] into a Future[Traversable[A]
      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] //  RemoveRDD msg slave actors
    }.toSeq)
  }
  
  // removeExecutor
// Executor BlockManager,
  private def removeExecutor(execId: String) {
    logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
  }
  private def removeBlockManager(blockManagerId: BlockManagerId) {
    val info = blockManagerInfo(blockManagerId)

    // Remove the block manager from blockManagerIdByExecutor.
    blockManagerIdByExecutor -= blockManagerId.executorId

    // Remove it from blockManagerInfo and remove all the blocks.
    blockManagerInfo.remove(blockManagerId)
    val iterator = info.blocks.keySet.iterator
    while (iterator.hasNext) {
      val blockId = iterator.next
      val locations = blockLocations.get(blockId)
      locations -= blockManagerId
      if (locations.size == 0) {
        blockLocations.remove(locations)
      }
    }
  }

//sendHeartBeat//blockManagerのhbをblockManagerInfoのLastSeenMsで表す
  private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
    if (!blockManagerInfo.contains(blockManagerId)) {
      blockManagerId.executorId == "<driver>" && !isLocal
    } else {
      blockManagerInfo(blockManagerId).updateLastSeenMs()
      true
    }
  }

//removeBlockの処理
  // Remove a block from the slaves that have it. This can only be used to remove
  // blocks that the master knows about.
  private def removeBlockFromWorkers(blockId: String) {
    val locations = blockLocations.get(blockId)
    if (locations != null) {
      locations.foreach { blockManagerId: BlockManagerId =>
        val blockManager = blockManagerInfo.get(blockManagerId)
        if (blockManager.isDefined) {
          // Remove the block from the slave's BlockManager.
          // Doesn't actually wait for a confirmation and the message might get lost.
          // If message loss becomes frequent, we should add retry logic here.
          blockManager.get.slaveActor ! RemoveBlock(blockId)
        }
      }
    }
  }

 

BlockManagerSlaveActor


Masterが配布できるslaveのmessageは2種類なので簡単です...簡単すぎるマスターから送られてきたイベントだけを扱うため、ほとんどのデータの読み書きなどをBlockManagerで直接実現している
/**
 * An actor to take commands from the master to execute options. For example,
 * this is used to remove blocks from the slave's BlockManager.
 */
class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
  override def receive = {
    case RemoveBlock(blockId) =>
      blockManager.removeBlock(blockId)
    case RemoveRdd(rddId) =>
      val numBlocksRemoved = blockManager.removeRdd(rddId)
      sender ! numBlocksRemoved
  }
}