Sparkソース分析のMemoryManager
39347 ワード
ストレージ(storage)と実行(execution)の間のメモリ使用を強制的に管理します.
#storage memoryとexecution memoryの使用量を記録
#storage、execution、unroll memoryの申請
#storageとexecution memoryの解放
execution memory:shuffles,joins,sorts,aggregationの計算操作を指す
storage memory:persistまたはcacheがメモリなどにデータをキャッシュする
unroll memory:blockを展開すること自体にメモリがかかります.ファイルを開くように、ファイルを開くにはメモリがかかります.
MemoryManagerはspark.memory.useLegacyModeこの構成項目は、残されたMemoryManagerポリシーであるStaticMemoryManagerを使用するかどうかを決定します.デフォルトでは、StaticMemoryManagerではなく、UnifiedMemoryManagerを使用します.
MemoryManager
1.1コア属性
Int numCores:コア数
Long onHeapStorageMemory:ヒープ内storageメモリサイズ
Long onHeapExecutionMemory:ヒープ内executionメモリサイズ
StorageMemoryPool onHeapStorageMemoryPool:ヒープ内storageメモリプールの作成
StorageMemoryPool offHeapStorageMemoryPool:ヒープ外storageメモリプールの作成
ExecutionMemoryPool onHeapExecutionMemoryPool:ヒープ内のexecutionメモリプールの作成
ExecutionMemoryPool offHeapExecutionMemoryPool:スタック外executionメモリプールの作成
Long maxOffHeapMemory:最大の対外メモリサイズ、spark.memory.offHeap.size構成は、sparkを有効にするには、構成を有効にする必要があります.memory.offHeap.enabled
Long maxOnHeapStorageMemory:最大ヒープ内storageメモリサイズ
Long maxOffHeapStorageMemory最大のスタック外storageメモリサイズ
1.2重要な方法
#numBytesバイトの実行メモリを解放する
defreleaseExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match{ case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId) case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId) } }
#指定したtaskのすべてのexecutionメモリを解放する
#Nバイトメモリの解放
#すべてのメモリを解放
二StaticMemoryManager
Executorのメモリの限界ははっきりしていて、それぞれ3つの部分から構成されています:execution、storageとsystem.各部のメモリを静的に分割すると変化しません
#executor:executionメモリサイズはsparkを設定します.shuffle.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.2です.
shuffle、join、ソート、集約などの操作を回避するために、ディスクに直接データを書き込むために設定されたbufferサイズは、ディスクの読み書き回数を減らします.
#storage:storageメモリサイズはsparkを設定する.storage.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.6です.
ユーザ表示呼び出しのpersist,cache,broadcastなどのコマンドが格納するデータ空間を格納するために使用される.
#system:プログラムの実行に必要な空間、spark内部のメタデータ情報、ユーザーのデータ構造を格納し、異常な大きな記録によるOOMを避ける.
この分割方式は、cacheやpersistに要求がないなど、リソースの浪費をもたらす場合があります.storageのメモリは残ります.
多くの属性が親MemoryManagerを継承しているため、ここでは説明しません.
# maxUnrollMemory
最大block展開メモリ容量、デフォルトは最大メモリの20%
#storageメモリの割り当てを申請します.StaticMemoryManagerはスタック外メモリをサポートしていません.
#acquireUnrollMemory blockの展開を申請するためのメモリ
#acquireExecutionMemory申請実行メモリ
#getMaxStorageMemory有効な最大storageメモリ領域を返す
#getMaxExecutionMemory最大のexecutionメモリ領域を返す
三UnifiedMemoryManager
従来のStaticMemoryManagerではリソースの無駄が発生していたため、このMemoryManagerを導入しました.UnifiedMemoryManagerの管理メカニズムは、execution空間とstorage空間の境界を薄め、メモリを相互に借りることができます.
合計使用可能なメモリはsparkです.memory.fraction決定、デフォルト0.6.使用可能なヒープメモリスケール*使用可能なメモリ.この空間内部ではexecutionとstorageをさらに区分した.spark.memory.storageFraction決定
#計算最大メモリ
#最大ヒープ外メモリの計算
#acquireExecutionMemory申請実行メモリ
#acquireStorageMemoryストレージの割り当てを申請
#getMaxMemoryは最大のメモリを返します
#storage memoryとexecution memoryの使用量を記録
#storage、execution、unroll memoryの申請
#storageとexecution memoryの解放
execution memory:shuffles,joins,sorts,aggregationの計算操作を指す
storage memory:persistまたはcacheがメモリなどにデータをキャッシュする
unroll memory:blockを展開すること自体にメモリがかかります.ファイルを開くように、ファイルを開くにはメモリがかかります.
MemoryManagerはspark.memory.useLegacyModeこの構成項目は、残されたMemoryManagerポリシーであるStaticMemoryManagerを使用するかどうかを決定します.デフォルトでは、StaticMemoryManagerではなく、UnifiedMemoryManagerを使用します.
MemoryManager
1.1コア属性
Int numCores:コア数
Long onHeapStorageMemory:ヒープ内storageメモリサイズ
Long onHeapExecutionMemory:ヒープ内executionメモリサイズ
StorageMemoryPool onHeapStorageMemoryPool:ヒープ内storageメモリプールの作成
StorageMemoryPool offHeapStorageMemoryPool:ヒープ外storageメモリプールの作成
ExecutionMemoryPool onHeapExecutionMemoryPool:ヒープ内のexecutionメモリプールの作成
ExecutionMemoryPool offHeapExecutionMemoryPool:スタック外executionメモリプールの作成
Long maxOffHeapMemory:最大の対外メモリサイズ、spark.memory.offHeap.size構成は、sparkを有効にするには、構成を有効にする必要があります.memory.offHeap.enabled
Long maxOnHeapStorageMemory:最大ヒープ内storageメモリサイズ
Long maxOffHeapStorageMemory最大のスタック外storageメモリサイズ
1.2重要な方法
#numBytesバイトの実行メモリを解放する
defreleaseExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match{ case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId) case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId) } }
#指定したtaskのすべてのexecutionメモリを解放する
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
#Nバイトメモリの解放
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}
#すべてのメモリを解放
final def releaseAllStorageMemory(): Unit = synchronized {
onHeapStorageMemoryPool.releaseAllMemory()
offHeapStorageMemoryPool.releaseAllMemory()
}
二StaticMemoryManager
Executorのメモリの限界ははっきりしていて、それぞれ3つの部分から構成されています:execution、storageとsystem.各部のメモリを静的に分割すると変化しません
#executor:executionメモリサイズはsparkを設定します.shuffle.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.2です.
shuffle、join、ソート、集約などの操作を回避するために、ディスクに直接データを書き込むために設定されたbufferサイズは、ディスクの読み書き回数を減らします.
#storage:storageメモリサイズはsparkを設定する.storage.memoryFractionパラメータを使用してサイズを制御します.デフォルトは0.6です.
ユーザ表示呼び出しのpersist,cache,broadcastなどのコマンドが格納するデータ空間を格納するために使用される.
#system:プログラムの実行に必要な空間、spark内部のメタデータ情報、ユーザーのデータ構造を格納し、異常な大きな記録によるOOMを避ける.
この分割方式は、cacheやpersistに要求がないなど、リソースの浪費をもたらす場合があります.storageのメモリは残ります.
多くの属性が親MemoryManagerを継承しているため、ここでは説明しません.
# maxUnrollMemory
最大block展開メモリ容量、デフォルトは最大メモリの20%
private val maxUnrollMemory: Long = {
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
#storageメモリの割り当てを申請します.StaticMemoryManagerはスタック外メモリをサポートしていません.
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap storage memory")
// storage ,
if (numBytes > maxOnHeapStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
// StorageMemoryPool numBytes
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}
#acquireUnrollMemory blockの展開を申請するためのメモリ
override def acquireUnrollMemory(
blockId: BlockId, numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
// storage block
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
// storage
val freeMemory = onHeapStorageMemoryPool.memoryFree
// block - , 0 ,
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
#acquireExecutionMemory申請実行メモリ
override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
#getMaxStorageMemory有効な最大storageメモリ領域を返す
private def getMaxStorageMemory(conf: SparkConf): Long = {
//
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
//
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
//
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
#getMaxExecutionMemory最大のexecutionメモリ領域を返す
private def getMaxExecutionMemory(conf: SparkConf): Long = {
//
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
// < ,
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
//
if (conf.contains("spark.executor.memory")) {
//
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
// < ,
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
// shuffle
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
// shuffle
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
三UnifiedMemoryManager
従来のStaticMemoryManagerではリソースの無駄が発生していたため、このMemoryManagerを導入しました.UnifiedMemoryManagerの管理メカニズムは、execution空間とstorage空間の境界を薄め、メモリを相互に借りることができます.
合計使用可能なメモリはsparkです.memory.fraction決定、デフォルト0.6.使用可能なヒープメモリスケール*使用可能なメモリ.この空間内部ではexecutionとstorageをさらに区分した.spark.memory.storageFraction決定
#計算最大メモリ
= -
override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
#最大ヒープ外メモリの計算
= -
override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}
#acquireExecutionMemory申請実行メモリ
override private[memory] def acquireExecutionMemory(
numBytes: Long, taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariants()
assert(numBytes >= 0)
// ,
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
// block, ,
// task , ,
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
// 0
if (extraMemoryNeeded > 0) {
// execution storage , storage
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,// storage
storagePool.poolSize - storageRegionSize) //
//
if (memoryReclaimableFromStorage > 0) {
// pool ,
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
//
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
//
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
// , v
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
}
// , false
if (numBytes > maxMemory) {
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
// storage , execution
if (numBytes > storagePool.memoryFree) {
// , , storage , execution
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
#acquireStorageMemoryストレージの割り当てを申請
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
// , v
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
}
// , false
if (numBytes > maxMemory) {
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
// storage , execution
if (numBytes > storagePool.memoryFree) {
// , , storage , execution
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
#getMaxMemoryは最大のメモリを返します
private def getMaxMemory(conf: SparkConf): Long = {
//
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
//
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
//
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
// < ,
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// executor
if (conf.contains("spark.executor.memory")) {
// executor
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
// executor <
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
// - =
val usableMemory = systemMemory - reservedMemory
// JVM , 60%
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
// *
(usableMemory * memoryFraction).toLong
}