Sparkストレージシステム:ディスクストレージDiskStore

4467 ワード

DiskStoreは、Blockをディスクに格納し、DiskBlockManagerのサービスに依存します.Spark 1.x.xバージョンでは、BlockStoreはディスクストレージDiskStore、メモリストレージMemeoryStore、TachyonストレージTachyonStoreの統一仕様を提供しています.DiskStore、MemoryStore、TachyonStoreは具体的な実装です.しかしSpark 2.0.0からTachyonStoreは廃止され、BlockStoreを使用して統合されたインタフェース仕様も提供されなくなり、DiskStoreとMemoryStoreはそれぞれ実現された.
DiskStoreのプロパティには、次の項目があります.
  • conf:すなわちSparkConf
  • diskManager:ディスクBlockマネージャDiskBlockManager
  • minMemoryMapBytes:ディスク内のBlockを直接読み出すかFileChannelのメモリミラーマッピング方法で読み出すかのしきい値
  • 小贴士:FileChannelのメモリミラーマッピング方法とは?Java NIOでは、FileChannelのmapメソッドが提供する高速読み取り技術は、実質的にチャネルに接続されたデータノードのすべてまたは一部のデータをメモリの1つのBufferに直接マッピングするものであり、このメモリBufferブロックはノードデータのミラーであり、このBufferを直接修正することで、ノードデータに影響を与える.このBufferはMappedBufferと呼ばれ、ミラーBufferです.メモリミラーであるため、処理速度が速い.
    次の方法を学習します.
    (1)getSize
    この方法は、所与のBlockIdに対応するBlockのサイズを取得するために使用される.
    //org.apache.spark.storage.DiskStore
    //diskManager: DiskBlockManager
    def getSize(blockId: BlockId): Long = {
      diskManager.getFile(blockId.name).length
    }

    DiskStoreが入手可能なgetSizeメソッドは,実質的にDiskBlockManagerのgetFileメソッドの呼び出しであり,Fileを取得した後にFileのサイズをとる.
    (2)contains
    指定されたBlockIdに対応するBlockファイルがローカルディスクストレージパスに含まれているかどうかを判断します.
    //org.apache.spark.storage.DiskStore
    def contains(blockId: BlockId): Boolean = {
      val file = diskManager.getFile(blockId.name)
      file.exists()
    }

    (3)remove
    指定されたBlockIdに対応するBlockファイルを削除
    //org.apache.spark.storage.DiskStore
    def remove(blockId: BlockId): Boolean = {
      val file = diskManager.getFile(blockId.name)
      if (file.exists()) {
        val ret = file.delete()
        if (!ret) {
          logWarning(s"Error deleting ${file.getPath()}")
        }
        ret
      } else {
        false
      }
    }

    (4)put
    BlockIdに対応するBlockをディスクに書き込みます.
    //org.apache.spark.storage.DiskStore
    def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
      if (contains(blockId)) {
        throw new IllegalStateException(s"Block $blockId is already present in the disk store")
      }
      logDebug(s"Attempting to put block $blockId")
      val startTime = System.currentTimeMillis
      val file = diskManager.getFile(blockId)
      val fileOutputStream = new FileOutputStream(file)
      var threwException: Boolean = true
      try {
        writeFunc(fileOutputStream)
        threwException = false
      } finally {
        try {
          Closeables.close(fileOutputStream, threwException)
        } finally {
           if (threwException) {
            remove(blockId)
          }
        }
      }
      val finishTime = System.currentTimeMillis
      logDebug("Block %s stored as %s file on disk in %d ms".format(
        file.getName,
        Utils.bytesToString(file.length()),
        finishTime - startTime))
    }
  • 1)containsメソッドを呼び出す所与のBlockIdに対応するBlockファイルが存在するか否かを判定し、存在する場合は次の
  • へ進む.
  • 2)DiskBlockManagerを呼び出すgetFileメソッドBlockIdに対応するファイルを取得し、ファイル出力ストリーム
  • を開く.
  • )コールバック関数writeFuncを呼び出し、Blockファイルに書き込みます.書き込みに失敗すると、removeメソッドを呼び出してBlockIdに対応するBlockファイル
  • を削除する必要がある.
    (5)putBytes
    BlockIdに対応するBlockをディスクに書き込むために使用され、Blockの内容はChunkedByteBufferにカプセル化されています.
    def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
      put(blockId) { fileOutputStream =>
        val channel = fileOutputStream.getChannel
        Utils.tryWithSafeFinally {
          bytes.writeFully(channel)
        } {
          channel.close()
        }
      }
    }

    putBytesメソッドにコールバック関数を設定し、putメソッドを呼び出します.putメソッドの実装によれば,ツールクラスUitlsのtryWithSafeFinallyメソッドを用いた外部のコールバック関数が最後にコールバックされる.
    (6)getBytes
    この方法は、所与のBlockIdに対応するBlockを読み出し、ChunkedByteBuffer戻りとしてカプセル化するために使用される
    def getBytes(blockId: BlockId): ChunkedByteBuffer = {
      val file = diskManager.getFile(blockId.name)
      val channel = new RandomAccessFile(file, "r").getChannel
      Utils.tryWithSafeFinally {
        // For small files, directly read rather than memory map
        if (file.length < minMemoryMapBytes) {
          val buf = ByteBuffer.allocate(file.length.toInt)
          channel.position(0)
          while (buf.remaining() != 0) {
            if (channel.read(buf) == -1) {
              throw new IOException("Reached EOF before filling buffer
    " + s"offset=0
    file=${file.getAbsolutePath}
    buf.remaining=${buf.remaining}") } } buf.flip() new ChunkedByteBuffer(buf) } else { new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) } } { channel.close() } }