Spark OFF_HEAP
3119 ワード
文章の冒頭で、自分のgithubのプロジェクト:AlluxioBlockManager、同時に私のgithubのブログもあります:blogこのプロジェクトの役割はSpark 2に代わることです.0以前のデフォルトの
OFF_HEAP
TachyonBlockManager
は、AlluxioBlockManagerとSpark 2を再開発する理由を後で説明します.0のoff_heap. OFF_HEAP
SparkのRDDにはいくつかのストレージ・レベルが用意されています.異なるストレージ・レベルは、MEMORY_ONLY
、MEMORY_ONLY_SER_2
...中でも、特にOFF_HEAP
off_heap
の利点は、メモリが限られた条件下で不要なメモリ消費を低減し、頻繁なGC問題を低減し、プログラム性能を向上させることである.Spark2.0以前、デフォルトoff_HeapはTachyonです.もちろん、ExternalBlockManager
を継承することで、自分が望んでいるoffを実現することができます.heap. ここでTachyonというのは、SparkのデフォルトであるTachyonBlockManagerの開発が完了すると、これ以上更新されないため、TachyonがAlluxioにアップグレードされた後に使用しないAPIが削除され、Sparkのデフォルトoff_Heapは使用できません.この問題SparkコミュニティとAlluxioコミュニティにはフィードバックがあります.ALLUXIO-1881
Spark2.0のoff_heap
spark 2から.0から、コミュニティは、デフォルトのTachyonBlockManagerおよびExternalBlockManagerに関連するAPI:SPARK-12667を削除しました.では、問題が来ました.Spark 2です.0で、OFF_HEAPはどのように処理されていますか?データはどこにありますか?上コード:まず、StorageLevelでは、異なるストレージレベルが異なる構造関数に解析され、OFF_HEAPのコンストラクション関数がわかりますOFF_HEAPは依然として存在する.Object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
...... ........
}
org.apache.spark.memory
にはMemoryMode
があり、MemoryMode
にはON_HEAP
を使用するかOFF_HEAP
を使用するかがマークされており、org.apache.spark.storage.memory.MemoryStore
ではMemoryMode
タイプに応じて異なるストレージが呼び出されるdef putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
.............
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
.............
}
再看MemoryStore
中存データの方法:putIteratorAsBytes
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
やっと見つけたSpark 2.0のoff_heapの最下位ストレージ:Platform
java unsafe APIを利用して実現されたアクセスoff_heapのクラス.
まとめ
spark2.0 off_Heapはjava unsafe APIを用いて実現されるメモリ管理である.利点:依然としてメモリの使用を減らすことができて、頻繁なGCを減らして、プログラムの性能を高めます.欠点:コードから見たOFF_の使用HEAPにはバックアップデータがなく、alluxioのようにデータの高可用性を保証することはできません.データが失われた場合は再計算が必要です.
spark 2から.0から、コミュニティは、デフォルトのTachyonBlockManagerおよびExternalBlockManagerに関連するAPI:SPARK-12667を削除しました.では、問題が来ました.Spark 2です.0で、OFF_HEAPはどのように処理されていますか?データはどこにありますか?上コード:まず、StorageLevelでは、異なるストレージレベルが異なる構造関数に解析され、OFF_HEAPのコンストラクション関数がわかりますOFF_HEAPは依然として存在する.
Object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
...... ........
}
org.apache.spark.memory
にはMemoryMode
があり、MemoryMode
にはON_HEAP
を使用するかOFF_HEAP
を使用するかがマークされており、org.apache.spark.storage.memory.MemoryStore
ではMemoryMode
タイプに応じて異なるストレージが呼び出されるdef putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
.............
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
.............
}
再看
MemoryStore
中存データの方法:putIteratorAsBytes
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
やっと見つけたSpark 2.0のoff_heapの最下位ストレージ:
Platform
java unsafe APIを利用して実現されたアクセスoff_heapのクラス.