Spark 1.6メモリ管理モデル(Unified Memory Management)分析
4618 ワード
2016年1月4日にSpark 1.6が発表された.新しいメモリ管理モデル:Unified Memory Managementを提案した.この文章では、新しいメモリ管理モデルを詳しく分析し、チューニングを容易にします.
前言
新しいメモリモデルはこのJiraが提案したJIRA-1000で、対応する設計ドキュメントはここにあります:unified-memory-management.
このドキュメントを貼るのは、この新しいモデルの設計動機、コミュニティが考えているソリューション、および比較を経て、最終的にどのソリューションを選択したかをより深く理解するためです.もちろん私も文章の中で言及しますが、これは本文の重点ではありません.
Memory Manager
Spark 1.6では、memoryManagerの選択は
決まりました.1.6以前のモデルを使用する場合、これは
まず1.6を見る前に、Executorのメモリにはどのような部分が構成されていますか. ExecutionMemory.このメモリ領域はshuffles,joins,sorts and aggregationsの過程で頻繁なIOを避けるために必要なbufferを解決するためである.sparkを通ります.shuffle.memoryFraction(デフォルト0.2)構成. StorageMemory.このメモリ領域はblock cache(dd.cache、rdd.persistなどの呼び出し方法を表示する)を解決するため、broadcasts、task resultsのストレージです.パラメータsparkを通過することができる.storage.memoryFraction(デフォルト0.6).設定 OtherMemory.プログラム自体の実行にもメモリが必要なので、システムに予約しておきます.(既定値は0.2)
また、OOMを防止するために、一般的には、ExecutionMemoryの実際の使用可能なメモリは
この問題はUnified Memory Managementモデルを引き出し,ExecutionMemoryとStorageMemoryという明確な境界を破ることに重点を置いている.
OtherMemory
Other memoryは1.6でも調整し、少なくとも300 mが利用できることを保証した.手動で
UnifiedMemoryManager
このクラスは2つのコアの方法を提供します.
acquireExecutionMemory ExecutionMemoryメモリが十分である場合、Storageへの申請メモリ はトリガーされません. Task毎に使用可能なメモリはpoolSize/(2*numActiveTasks)~maxPoolSize/numActiveTasksの間に制限される. ExecutionMemoryのメモリが不足している場合、StorageMemoryにメモリをインデックスする操作がトリガーされます.
ExecutionMemoryのメモリが不足している場合は、StorageMemoryにメモリを要求しますが、具体的にはどうでしょうか.次のコードを見ればわかります.
StorageMemoryPoolの残りのメモリとstorageMemoryPoolがExecutionMemoryから借りたメモリの大きさを見て、最大のメモリを取り、再返済できる最大のメモリとします.数式で表現すると、次のようになります.
ExecutionMemoryで借りられる最大メモリ=StorageMemoryで借りたメモリ+StorageMemory空きメモリ
もちろん、実際に必要とされる最大値よりも小さい場合は、実際に必要とされる値に準じる.次のコードは、この論理を示しています.
acquireStorageMemory
プロセスはacquireExecutionMemoryと似ていますが、違いは、ExecutionMemoryに空きメモリがある場合にのみ、StorageMemoryがメモリを借りることができます.この論理はこの行のコードに現れています.
したがって、StorageMemoryがExecutionMemoryから借りたメモリは、その時ExecutionMemoryが空きメモリを持っていたかどうかにかかっています.
MemoryPool
先に述べたのはStorageMemoryとExecutionMemoryのインタラクションです.現在メモリの具体的な表示はMemoryPoolによって行われています.
UnifiedMemoryManagementでは、3つのオブジェクトが維持されています.
実際のメモリカウントは、これらのオブジェクトによって行われます.たとえばメモリの借入 task現在のメモリの使用追跡 値の注意点は、システムshuffleの場合、in-heap/off-heapメモリを使用できることを以前から知っていました.UnifiedMemoryManagementでは,異なるオブジェクトを用いて追跡する.offHeapExecutionMemoryPoolをオンにすると、StorageMemoryとのインタラクションは存在せず、ダイナミックメモリの概念もありません.
まとめ理論的にはShuffle spill数を減らすことができ、極端な状況ではspill過程がない可能性があり、IO回数 を大幅に減らすことができる.メモリが緊張している場合は、問題を解決できない可能性があります. 重度のExectionMemoryやStorageMemoryのいずれかのプログラムが偏向している場合、 の効果が得られる可能性があります.
前言
新しいメモリモデルはこのJiraが提案したJIRA-1000で、対応する設計ドキュメントはここにあります:unified-memory-management.
このドキュメントを貼るのは、この新しいモデルの設計動機、コミュニティが考えているソリューション、および比較を経て、最終的にどのソリューションを選択したかをより深く理解するためです.もちろん私も文章の中で言及しますが、これは本文の重点ではありません.
Memory Manager
Spark 1.6では、memoryManagerの選択は
spark.memory.useLegacyMode=false
決まりました.1.6以前のモデルを使用する場合、これは
StaticMemoryManager
を使用して管理されます.そうしないと、新しいUnifiedMemoryManager
が使用されます.まず1.6を見る前に、Executorのメモリにはどのような部分が構成されていますか.
また、OOMを防止するために、一般的には、ExecutionMemoryの実際の使用可能なメモリは
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
、すなわち0.8*0.2であり、16%のメモリしか使用できないsafetyFractionがあります.このようなメモリ割り当てメカニズムでは、最大の問題は、誰も自分の上限を超えてはいけないということで、どれだけ規定されているかということです.別のメモリは空いていますが.これはStorageMemoryとExecutionMemoryが深刻で、メモリを消費する大戸です.この問題はUnified Memory Managementモデルを引き出し,ExecutionMemoryとStorageMemoryという明確な境界を破ることに重点を置いている.
OtherMemory
Other memoryは1.6でも調整し、少なくとも300 mが利用できることを保証した.手動で
spark.testing.reservedMemory
を設定することもできます.その後、実際に使用可能なメモリをこのreservedMemoryから減算してusableMemoryを得る.ExecutionMemoryとStorageMemoryはusableMemory * 0.75
のメモリを共有します.0.75は、新しいパラメータspark.memory.fraction
によって設定することができる.現在のspark.memory.storageFraction
のデフォルト値は0.5なので、ExecutionMemory、StorageMemoryのデフォルトは上記の使用可能なメモリに均等に分けられます.UnifiedMemoryManager
このクラスは2つのコアの方法を提供します.
acquireExecutionMemory
acquireStorageMemory
acquireExecutionMemory
ExecutionMemory
を申請するたびに、maybeGrowExecutionPool
メソッドが呼び出されます.このメソッドに基づいて、いくつかの有意義な結論を得ることができます.maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)
poolSize = ExecutionMemoryPool.poolSize ( ExecutionMemoryPool )
ExecutionMemoryのメモリが不足している場合は、StorageMemoryにメモリを要求しますが、具体的にはどうでしょうか.次のコードを見ればわかります.
val memoryReclaimableFromStorage = math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
StorageMemoryPoolの残りのメモリとstorageMemoryPoolがExecutionMemoryから借りたメモリの大きさを見て、最大のメモリを取り、再返済できる最大のメモリとします.数式で表現すると、次のようになります.
ExecutionMemoryで借りられる最大メモリ=StorageMemoryで借りたメモリ+StorageMemory空きメモリ
もちろん、実際に必要とされる最大値よりも小さい場合は、実際に必要とされる値に準じる.次のコードは、この論理を示しています.
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded,memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
acquireStorageMemory
プロセスはacquireExecutionMemoryと似ていますが、違いは、ExecutionMemoryに空きメモリがある場合にのみ、StorageMemoryがメモリを借りることができます.この論理はこの行のコードに現れています.
val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
したがって、StorageMemoryがExecutionMemoryから借りたメモリは、その時ExecutionMemoryが空きメモリを持っていたかどうかにかかっています.
MemoryPool
先に述べたのはStorageMemoryとExecutionMemoryのインタラクションです.現在メモリの具体的な表示はMemoryPoolによって行われています.
UnifiedMemoryManagementでは、3つのオブジェクトが維持されています.
@GuardedBy("this")
protected val storageMemoryPool = new StorageMemoryPool(this)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
実際のメモリカウントは、これらのオブジェクトによって行われます.たとえば
まとめ