Sparkストレージシステム:ディスクストレージDiskStore
4467 ワード
DiskStoreは、Blockをディスクに格納し、DiskBlockManagerのサービスに依存します.Spark 1.x.xバージョンでは、BlockStoreはディスクストレージDiskStore、メモリストレージMemeoryStore、TachyonストレージTachyonStoreの統一仕様を提供しています.DiskStore、MemoryStore、TachyonStoreは具体的な実装です.しかしSpark 2.0.0からTachyonStoreは廃止され、BlockStoreを使用して統合されたインタフェース仕様も提供されなくなり、DiskStoreとMemoryStoreはそれぞれ実現された.
DiskStoreのプロパティには、次の項目があります. conf:すなわちSparkConf diskManager:ディスクBlockマネージャDiskBlockManager minMemoryMapBytes:ディスク内のBlockを直接読み出すかFileChannelのメモリミラーマッピング方法で読み出すかのしきい値 小贴士:FileChannelのメモリミラーマッピング方法とは?Java NIOでは、FileChannelのmapメソッドが提供する高速読み取り技術は、実質的にチャネルに接続されたデータノードのすべてまたは一部のデータをメモリの1つのBufferに直接マッピングするものであり、このメモリBufferブロックはノードデータのミラーであり、このBufferを直接修正することで、ノードデータに影響を与える.このBufferはMappedBufferと呼ばれ、ミラーBufferです.メモリミラーであるため、処理速度が速い.
次の方法を学習します.
(1)getSize
この方法は、所与のBlockIdに対応するBlockのサイズを取得するために使用される.
DiskStoreが入手可能なgetSizeメソッドは,実質的にDiskBlockManagerのgetFileメソッドの呼び出しであり,Fileを取得した後にFileのサイズをとる.
(2)contains
指定されたBlockIdに対応するBlockファイルがローカルディスクストレージパスに含まれているかどうかを判断します.
(3)remove
指定されたBlockIdに対応するBlockファイルを削除
(4)put
BlockIdに対応するBlockをディスクに書き込みます.1)containsメソッドを呼び出す所与のBlockIdに対応するBlockファイルが存在するか否かを判定し、存在する場合は次の へ進む.2)DiskBlockManagerを呼び出すgetFileメソッドBlockIdに対応するファイルを取得し、ファイル出力ストリーム を開く.)コールバック関数writeFuncを呼び出し、Blockファイルに書き込みます.書き込みに失敗すると、removeメソッドを呼び出してBlockIdに対応するBlockファイル を削除する必要がある.
(5)putBytes
BlockIdに対応するBlockをディスクに書き込むために使用され、Blockの内容はChunkedByteBufferにカプセル化されています.
putBytesメソッドにコールバック関数を設定し、putメソッドを呼び出します.putメソッドの実装によれば,ツールクラスUitlsのtryWithSafeFinallyメソッドを用いた外部のコールバック関数が最後にコールバックされる.
(6)getBytes
この方法は、所与のBlockIdに対応するBlockを読み出し、ChunkedByteBuffer戻りとしてカプセル化するために使用される
DiskStoreのプロパティには、次の項目があります.
次の方法を学習します.
(1)getSize
この方法は、所与のBlockIdに対応するBlockのサイズを取得するために使用される.
//org.apache.spark.storage.DiskStore
//diskManager: DiskBlockManager
def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
}
DiskStoreが入手可能なgetSizeメソッドは,実質的にDiskBlockManagerのgetFileメソッドの呼び出しであり,Fileを取得した後にFileのサイズをとる.
(2)contains
指定されたBlockIdに対応するBlockファイルがローカルディスクストレージパスに含まれているかどうかを判断します.
//org.apache.spark.storage.DiskStore
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
(3)remove
指定されたBlockIdに対応するBlockファイルを削除
//org.apache.spark.storage.DiskStore
def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
logWarning(s"Error deleting ${file.getPath()}")
}
ret
} else {
false
}
}
(4)put
BlockIdに対応するBlockをディスクに書き込みます.
//org.apache.spark.storage.DiskStore
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val fileOutputStream = new FileOutputStream(file)
var threwException: Boolean = true
try {
writeFunc(fileOutputStream)
threwException = false
} finally {
try {
Closeables.close(fileOutputStream, threwException)
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
(5)putBytes
BlockIdに対応するBlockをディスクに書き込むために使用され、Blockの内容はChunkedByteBufferにカプセル化されています.
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { fileOutputStream =>
val channel = fileOutputStream.getChannel
Utils.tryWithSafeFinally {
bytes.writeFully(channel)
} {
channel.close()
}
}
}
putBytesメソッドにコールバック関数を設定し、putメソッドを呼び出します.putメソッドの実装によれば,ツールクラスUitlsのtryWithSafeFinallyメソッドを用いた外部のコールバック関数が最後にコールバックされる.
(6)getBytes
この方法は、所与のBlockIdに対応するBlockを読み出し、ChunkedByteBuffer戻りとしてカプセル化するために使用される
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer
" +
s"offset=0
file=${file.getAbsolutePath}
buf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
}
} {
channel.close()
}
}