Sparkソース分析のMemoryManager


ストレージ(storage)と実行(execution)の間のメモリ使用を強制的に管理します.
#storage memoryとexecution memoryの使用量を記録
#storage、execution、unroll memoryの申請
#storageとexecution memoryの解放
execution memory:shuffles,joins,sorts,aggregationの計算操作を指す
storage memory:persistまたはcacheがメモリなどにデータをキャッシュする
unroll memory:blockを展開すること自体にメモリがかかります.ファイルを開くように、ファイルを開くにはメモリがかかります.
 
MemoryManagerはspark.memory.useLegacyModeこの構成項目は、残されたMemoryManagerポリシーであるStaticMemoryManagerを使用するかどうかを決定します.デフォルトでは、StaticMemoryManagerではなく、UnifiedMemoryManagerを使用します.
MemoryManager
1.1コア属性
Int numCores:コア数
Long onHeapStorageMemory:ヒープ内storageメモリサイズ
Long onHeapExecutionMemory:ヒープ内executionメモリサイズ
StorageMemoryPool onHeapStorageMemoryPool:ヒープ内storageメモリプールの作成
StorageMemoryPool offHeapStorageMemoryPool:ヒープ外storageメモリプールの作成
ExecutionMemoryPool onHeapExecutionMemoryPool:ヒープ内のexecutionメモリプールの作成
ExecutionMemoryPool offHeapExecutionMemoryPool:スタック外executionメモリプールの作成
Long maxOffHeapMemory:最大の対外メモリサイズ、spark.memory.offHeap.size構成は、sparkを有効にするには、構成を有効にする必要があります.memory.offHeap.enabled
Long maxOnHeapStorageMemory:最大ヒープ内storageメモリサイズ
Long maxOffHeapStorageMemory最大のスタック外storageメモリサイズ
1.2重要な方法
#numBytesバイトの実行メモリを解放する
defreleaseExecutionMemory(     numBytes: Long,     taskAttemptId: Long,     memoryMode: MemoryMode): Unit = synchronized {   memoryMode match{     case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)     case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)   } }
 
#指定したtaskのすべてのexecutionメモリを解放する
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
  onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
    offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

 
#Nバイトメモリの解放
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
    case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
  }
}

 
#すべてのメモリを解放
final def releaseAllStorageMemory(): Unit = synchronized {
  onHeapStorageMemoryPool.releaseAllMemory()
  offHeapStorageMemoryPool.releaseAllMemory()
}

 
二StaticMemoryManager
Executorのメモリの限界ははっきりしていて、それぞれ3つの部分から構成されています:execution、storageとsystem.各部のメモリを静的に分割すると変化しません
#executor:executionメモリサイズはsparkを設定します.shuffle.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.2です.
shuffle、join、ソート、集約などの操作を回避するために、ディスクに直接データを書き込むために設定されたbufferサイズは、ディスクの読み書き回数を減らします.
#storage:storageメモリサイズはsparkを設定する.storage.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.6です.
ユーザ表示呼び出しのpersist,cache,broadcastなどのコマンドが格納するデータ空間を格納するために使用される.
#system:プログラムの実行に必要な空間、spark内部のメタデータ情報、ユーザーのデータ構造を格納し、異常な大きな記録によるOOMを避ける.
この分割方式は、cacheやpersistに要求がないなど、リソースの浪費をもたらす場合があります.storageのメモリは残ります.
 
多くの属性が親MemoryManagerを継承しているため、ここでは説明しません.
 
 
# maxUnrollMemory
最大block展開メモリ容量、デフォルトは最大メモリの20%
private val maxUnrollMemory: Long = {
  (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

 
#storageメモリの割り当てを申請します.StaticMemoryManagerはスタック外メモリをサポートしていません.
override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {
  require(memoryMode != MemoryMode.OFF_HEAP,
    "StaticMemoryManager does not support off-heap storage memory")
  //              storage  ,    
  if (numBytes > maxOnHeapStorageMemory) {
    // Fail fast if the block simply won't fit
    logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
      s"memory limit ($maxOnHeapStorageMemory bytes)")
    false
  } else {
    //   StorageMemoryPool  numBytes    
    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
  }
}

 
#acquireUnrollMemory blockの展開を申請するためのメモリ
override def acquireUnrollMemory(
    blockId: BlockId, numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {
  require(memoryMode != MemoryMode.OFF_HEAP,
    "StaticMemoryManager does not support off-heap unroll memory")
  //   storage      block      
  val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
  //   storage        
  val freeMemory = onHeapStorageMemoryPool.memoryFree
  //            block   -      ,       0
  val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
  val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
  onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

 
#acquireExecutionMemory申請実行メモリ
override def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {
  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
  }
}

 
#getMaxStorageMemory有効な最大storageメモリ領域を返す
private def getMaxStorageMemory(conf: SparkConf): Long = {
  //         
  val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  //       
  val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
  //     
  val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
  (systemMaxMemory * memoryFraction * safetyFraction).toLong
}

 
#getMaxExecutionMemory最大のexecutionメモリ領域を返す
private def getMaxExecutionMemory(conf: SparkConf): Long = {
  //       
  val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  //          < 
  if (systemMaxMemory < MIN_MEMORY_BYTES) {
    throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
      s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
      s"option or spark.driver.memory in Spark configuration.")
  }
  //            
  if (conf.contains("spark.executor.memory")) {
    //           
    val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
    //          < 
    if (executorMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
        s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
        s"--executor-memory option or spark.executor.memory in Spark configuration.")
    }
  }
  // shuffle      
  val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
  // shuffle      
  val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
  (systemMaxMemory * memoryFraction * safetyFraction).toLong
}

 
三UnifiedMemoryManager
従来のStaticMemoryManagerではリソースの無駄が発生していたため、このMemoryManagerを導入しました.UnifiedMemoryManagerの管理メカニズムは、execution空間とstorage空間の境界を薄め、メモリを相互に借りることができます.
合計使用可能なメモリはsparkです.memory.fraction決定、デフォルト0.6.使用可能なヒープメモリスケール*使用可能なメモリ.この空間内部ではexecutionとstorageをさらに区分した.spark.memory.storageFraction決定
 
#計算最大メモリ
          =      -       
override def maxOnHeapStorageMemory: Long = synchronized {
  maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

 
#最大ヒープ外メモリの計算
            =        -         
override def maxOffHeapStorageMemory: Long = synchronized {
  maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

 
#acquireExecutionMemory申請実行メモリ
override private[memory] def acquireExecutionMemory(
    numBytes: Long, taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {
  assertInvariants()
  assert(numBytes >= 0)
  // 
  val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      onHeapStorageRegionSize,
      maxHeapMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      offHeapStorageMemory,
      maxOffHeapMemory)
  }

  //        block,       ,                
  //   task        ,          ,                 
  def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
    //            0
    if (extraMemoryNeeded > 0) {
      //   execution      storagestorage                 
      val memoryReclaimableFromStorage = math.max(
        storagePool.memoryFree,// storage       
        storagePool.poolSize - storageRegionSize) //       
      //         
      if (memoryReclaimableFromStorage > 0) {
        //   pool
        val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
          math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
        storagePool.decrementPoolSize(spaceToReclaim)
        executionPool.incrementPoolSize(spaceToReclaim)
      }
    }
  }

  //               
  def computeMaxExecutionPoolSize(): Long = {
    maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
  }

  executionPool.acquireMemory(
    numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
//         
override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {
  assertInvariants()
  assert(numBytes >= 0)
  // v
  val (executionPool, storagePool, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      maxOnHeapStorageMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      maxOffHeapMemory)
  }
  // false
  if (numBytes > maxMemory) {
    logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
      s"memory limit ($maxMemory bytes)")
    return false
  }
  //              storage      execution 
  if (numBytes > storagePool.memoryFree) {
    //         ,            ,  storageexecution  
    val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
    executionPool.decrementPoolSize(memoryBorrowedFromExecution)
    storagePool.incrementPoolSize(memoryBorrowedFromExecution)
  }
  storagePool.acquireMemory(blockId, numBytes)
}

 
#acquireStorageMemoryストレージの割り当てを申請
override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {
  assertInvariants()
  assert(numBytes >= 0)
  // v
  val (executionPool, storagePool, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      maxOnHeapStorageMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      maxOffHeapMemory)
  }
  // false
  if (numBytes > maxMemory) {
    logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
      s"memory limit ($maxMemory bytes)")
    return false
  }
  //              storage      execution 
  if (numBytes > storagePool.memoryFree) {
    //         ,            ,  storageexecution  
    val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
    executionPool.decrementPoolSize(memoryBorrowedFromExecution)
    storagePool.incrementPoolSize(memoryBorrowedFromExecution)
  }
  storagePool.acquireMemory(blockId, numBytes)
}

 
#getMaxMemoryは最大のメモリを返します
private def getMaxMemory(conf: SparkConf): Long = {
  //       
  val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  //        
  val reservedMemory = conf.getLong("spark.testing.reservedMemory",
    if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
  //        
  val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
  //        < 
  if (systemMemory < minSystemMemory) {
    throw new IllegalArgumentException(s"System memory $systemMemory must " +
      s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
      s"option or spark.driver.memory in Spark configuration.")
  }
  //      executor  
  if (conf.contains("spark.executor.memory")) {
    //   executor  
    val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
    //   executor   <            
    if (executorMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
        s"$minSystemMemory. Please increase executor memory using the " +
        s"--executor-memory option or spark.executor.memory in Spark configuration.")
    }
  }
  //      -         =       
  val usableMemory = systemMemory - reservedMemory
  //     JVM60%
  val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
  //             *       
  (usableMemory * memoryFraction).toLong
}