spark 2.2.0ソース読み---spark coreバッグ---launcher/memoryバッグ

6848 ワード

1、本文の目標及びその他の説明:
    本文は主にlauncher/memoryの二つのカバンのデータ構造を紹介します.
2、launcherパッケージの下のデータ構造説明
    
prvate[spark]abstract class Launcher Backend{
サーバーを起動して会話するために使用されます.BackendConnectionオブジェクトを作成します.すぐにSocketを封入しました.ソケットは入出力ストリームを取得することができます.
private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, 
command: Command)    extends AbstractCommandBuilder {
      java jvm  
3、memoryパッケージの下のデータ構造説明    
private[memory] class ExecutionMemoryPool(
    lock: Object,
    memoryMode: MemoryMode
)extens MemoryPool with Logging{
すぐに一つのメンテナンスをしました.
@GuardedBy("lock")
private val memoryForTask = new mutable.HashMap[Long, Long]()   。key   ID,value 
           。                、                 value  。
private[spark] abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int,
    onHeapStorageMemory: Long,
    onHeapExecutionMemory: Long) extends Logging {       、             
 
  
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
private[memory] abstract class MemoryPool(lock: Object) {    
 
  
@GuardedBy("lock")
private[this] var _poolSize: Long = 0
はすぐにこのようなフィールドを維持して、現在のメモリプールの大きさを表します.
private[spark] class StaticMemoryManager(
    conf: SparkConf,
    maxOnHeapExecutionMemory: Long,
    override val maxOnHeapStorageMemory: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    maxOnHeapStorageMemory,
    maxOnHeapExecutionMemory) {
はヒープのメモリ管理だけをサポートしていますが、実はすぐにpoolオブジェクトを維持しています.poolのサイドには合計サイズと使用済みフィールドがあります.割り当てを申請する時です.
 
private[memory] class StorageMemoryPool(
    lock: Object,
    memoryMode: MemoryMode
)extens MemoryPool with Logging{
主に修正によるものです.
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L  
@GuardedBy("lock")
private[this] var _poolSize: Long = 0
private[spark] class UnifiedMemoryManager private[memory] (
    conf: SparkConf,
    val maxHeapMemory: Long,
    onHeapStorageRegionSize: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    onHeapStorageRegionSize,
    maxHeapMemory - onHeapStorageRegionSize) {
    ,          。  :    2G(         ),1g    ,1g    。 
メモリは1 Gを満たした後、運転がまだ残っている0.5 Gがあることを発見しました.それではメモリーは運行の方に借りに行きます.逆も同じです.