HadoopオーソリティーガイドのSpark-4
6744 ワード
本明細書の元のアドレス
Persistence
この章の冒頭の例に戻ると、「年間-気温」の中間データセットをメモリにキャッシュできます.
呼び出しcache()はすぐにRDDをメモリにキャッシュすることはなく、このRDDにタグを付けるだけで、Spark jobが実行されると実際のキャッシュ動作が発生します.まずjobを強制的に実行します
BlockManagerInfoに関するログ表示では、job実行の一部としてRDDのパーティションがメモリに保持されます.ログは、このRDDの番号が4(cache()メソッドが呼び出された後のコンソール出力においても、この情報が表示される)であることを示し、ラベルはそれぞれ0と1の2つのパーティションを含む.このキャッシュされたデータセット上で別のjobを実行すると、このRDDがメモリからロードされることがわかります.今回は最低気温を計算します.
これは微小データセット上の簡単な例であるが,より大きなjobでは節約時間が大きい.MapReduceでは、別の計算を実行するために、入力データセットを再びディスクからロードする必要があります.中間データが入力として使用できても(たとえば、洗浄後のデータセット、無効な行、不要なフィールドが削除されている)、「データをディスクからロードする必要がある」という事実は変更できません.これは遅いです.Sparkは、クラスタ間メモリキャッシュにデータセットをキャッシュします.これは、このデータセットベースの計算が非常に速く実行されることを意味します.
この効率は,データをインタラクティブに探索する際に極めて有用である.これは、反復アルゴリズムなどのいくつかのタイプのアルゴリズムにも自然に適しており、1回の反復計算の結果をメモリにキャッシュし、次の反復計算の入力にすることができます.このアルゴリズムは、反復のたびに個別のMapReduce jobであるMapReduceで実装することもできます.したがって、反復のたびに結果はディスクに書き込まれ、次回の反復で読み返さなければなりません.
キャッシュされたRDDは、同じアプリケーションのjobによってのみ取得できます.異なるアプリケーション間でデータセットを共有するには、最初のアプリケーションでsaveAs*()メソッド(saveAsTextFile()、saveAsHadoopFile()など)を使用して外部ストレージに書き込み、2番目のアプリケーションでSparkContextの対応メソッド(textFile()、hadoopFile()など)を使用して再ロードする必要があります.同様に、アプリケーションが終了すると、キャッシュされたすべてのRDDは、明示的に保存されない限り、再アクセスできません.
Persistence levels
Cache()を呼び出すと、RDDの各パーティションがエフェクタのメモリに永続化されます.このRDDパーティションを格納するためにアクチュエータに十分なメモリがない場合、計算は失敗しません.逆に、パーティションは必要に応じて再計算されます.多くのtrsansformationを持つ複雑なプログラムでは、これは高価です.したがってSparkは、persist()を呼び出すときにStorageLevelパラメータを指定すると、ユーザーが選択できる異なるタイプの永続化動作を提供します.
デフォルトの永続化レベルはMEMORY_です.ONLYは、オブジェクトの通常のメモリ表示を使用します.よりコンパクトな表現形式を使用するには、パーティション内の要素をバイト配列(byte array)にシーケンス化します.このレベルはMEMORY_ONLY_SER,MEMORY_よりONLY、このレベルはCPUの圧力を引き起こし、シーケンス化後のRDDパーティションがメモリに適応できるが、従来のメモリが不適切であることを示す場合、この圧力は価値がある.MEMORY_ONLY_SERはまた、各RDDが多くのオブジェクトではなくバイト配列で格納されるため、ゴミ回収の圧力を軽減する.
driverプログラムのログファイルで、BlockManagerに関する情報を確認すると、RDDパーティションがメモリに適していないかどうかを確認できます.さらに、各driverのSparkContextは4040ポートでHTTPサービスを開始し、キャッシュされたRDDパーティションを含む実行環境および実行中のjobに関する有用な情報を提供する.
デフォルトでは、従来のJavaシーケンス化フレームワークを使用してRDDパーティションをシーケンス化していますが、Kryoシーケンス化フレームワーク(次のセクションで説明する)は通常、サイズと速度の両方でより優れた選択です.シーケンス化後のパーティションを圧縮すると、より多くのスペースを節約(CPUの代価をもう一度払う)してsparkを設定することができる.rdd.compress属性はtrueで圧縮を有効にし、属性spark.io.compression.codecはオプションの設定です.
データセットの再計算が非常に高価である場合、MEMORY_AND_DISK(データセットがメモリに入らない場合はディスクに書きます)またはMEMORY_AND_DISK_SER(シーケンス化されたデータセットがメモリに入れられない場合は、ディスクに書き込む)が適切です.
クラスタ内の複数のノードでパーティションをコピーしたり、off-heapメモリを使用したりして、Sparkドキュメントを表示するためのより高度なおよび実験的な永続化レベルもあります.
Serialization
Sparkでは,シーケンス化データとシーケンス化関数(または閉パケット)の2つの側面を考慮する必要がある.
Data
まずデータのシーケンス化を見てみましょう.デフォルトではSparkは、Javaシーケンス化フレームワークを使用してエフェクタ間のネットワーク上でデータを転送するか、シーケンス化してデータをキャッシュします.プログラマーにとって、Javaのシーケンス化はよく理解されています.あなたが使用しているクラスがjavaを実現していることを確認するだけです.io.Serializableインタフェースまたはjava.io.Externalizableインタフェースですが、性能と大きさの観点から、この方式の効率は高くありません.
ほとんどのSparkプログラムでは、Kryoシーケンス化フレームワークがより良い選択です.Kryoは効率的な汎用Javaシーケンス化ライブラリです.Kryoを使用するにはdriverプログラムのSparkConfにsparkを設定.serializerプロパティは次のとおりです.
Kryoはクラスに特定のインタフェースを実装する必要はありません.そのため、簡単なJavaオブジェクトは変更する必要はありません.RDDで使用できます.とはいえ、クラスを使用する前にKryoに登録するとより効率的になります.これは、Kryoがシーケンス化されたオブジェクトのクラス(オブジェクトが参照に対応する)を指す参照を作成し、クラスが登録されている場合、その参照は整数IDであり、クラスが登録されていない場合、その参照はクラスのフルネームであるためです.このブートは自分のクラスにのみ適用され、ScalaクラスやAvro GenericやThriftクラスなどの多くの他のフレームワーククラスはSparkによって登録されています.
Kryoへのクラス登録も簡単です.KryoRegistratorのサブクラスを作成し、registerClasses()メソッドを上書きします.
最後にdriverプログラムでは、属性spark.kryo.registratorはあなたのKryoRegistrator実装クラスの完全なクラス名に設定されています.
Functions
通常、関数のシーケンス化は「ちょうど動作する」:Scalaでは、関数はすべてシーケンス化可能であり、標準Javaシーケンス化メカニズムを使用します.これもSparkがリモートエフェクタノードに関数を送信する際に使用する方法です.Sparkは、ローカルモードで実行しても関数をシーケンス化します.シーケンス不可能な関数(例えば、非シーケンス化クラスの方法から変換された関数)を何気なく導入すると、開発プロセスの初期段階で発見されます.
Shared Variables
Sparkプログラムは、RDDの一部ではないデータにアクセスする必要があることが多い.たとえば、次のプログラムはmap()操作でルックアップテーブル(lookup table)を使用します.
このプログラムは正しく動作します(変数lookupは閉パケットの一部としてシーケンス化されmap()に渡されます)が、ブロードキャスト変数を使用するより効率的な方法があります.
Broadcast Variables
ブロードキャスト変数は、シーケンス化後に各エフェクタに送信され、そこでキャッシュされるため、後続のタスクは必要に応じてアクセスできます.これは通常の変数とは異なります.通常の変数は、閉パケットの一部にシーケンス化され、ネットワーク上で転送され、1つのタスクが1回転送されます.ブロードキャスト変数の役割は,MapReduceの分散キャッシュと似ているが,Spark内部の実装ではメモリにデータを格納し,メモリが消費された場合にのみディスクに書き込まれる.
ブロードキャスト変数の作成方法は,ブロードキャストが必要な変数をSparkContextのbroadcast()メソッドに渡すことである.Tタイプの変数はBroadcast[T]にパッケージされ、次のように返されます.
RDDのmap()操作では、このブロードキャスト変数のvalueを呼び出してアクセスする.
その名の通り、ブロードキャスト変数は一方向に伝送され、driverからtaskまで--ブロードキャスト変数を更新してdriverに返信することはできません.そのためには、アキュムレータが必要です.
Accumulators
アキュムレータは共有変数であり、MapReduceのカウンタと同様に、タスクは増加するしかありません.jobが完了すると、アキュムレータの最終値はdriverプログラムで取得できます.次の例では、アキュムレータを使用して整数RDDの要素数を計算し、reduce()操作を使用してRDDの値を合計します.
最初の行のコードはSparkContextのaccumulator()メソッドを使用して、アキュムレータ変数countを作成します.map()操作は一定の関数であり,副作用はcountを増加させる.Spark jobの結果が計算されると,アキュムレータの値はvalueを呼び出すことによってアクセスする.
この例では,アキュムレータとしてIntを用いたが,任意の数値タイプが可能であった.Sparkでは、アキュムレータを使用した結果のタイプが「増分される」タイプとは異なる(SparkContextのaccumulable()メソッドを参照)、可変セットの値を加算できる(accumulableCollection()を参照).
Persistence
この章の冒頭の例に戻ると、「年間-気温」の中間データセットをメモリにキャッシュできます.
scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at :18
呼び出しcache()はすぐにRDDをメモリにキャッシュすることはなく、このRDDにタグを付けるだけで、Spark jobが実行されると実際のキャッシュ動作が発生します.まずjobを強制的に実行します
scala> tuples.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
(1950,22)
(1949,111)
BlockManagerInfoに関するログ表示では、job実行の一部としてRDDのパーティションがメモリに保持されます.ログは、このRDDの番号が4(cache()メソッドが呼び出された後のコンソール出力においても、この情報が表示される)であることを示し、ラベルはそれぞれ0と1の2つのパーティションを含む.このキャッシュされたデータセット上で別のjobを実行すると、このRDDがメモリからロードされることがわかります.今回は最低気温を計算します.
scala> tuples.reduceByKey((a, b) => Math.min(a, b)).foreach(println(_))
INFO BlockManager: Found block rdd_4_0 locally
INFO BlockManager: Found block rdd_4_1 locally
(1949,78)
(1950,-11)
これは微小データセット上の簡単な例であるが,より大きなjobでは節約時間が大きい.MapReduceでは、別の計算を実行するために、入力データセットを再びディスクからロードする必要があります.中間データが入力として使用できても(たとえば、洗浄後のデータセット、無効な行、不要なフィールドが削除されている)、「データをディスクからロードする必要がある」という事実は変更できません.これは遅いです.Sparkは、クラスタ間メモリキャッシュにデータセットをキャッシュします.これは、このデータセットベースの計算が非常に速く実行されることを意味します.
この効率は,データをインタラクティブに探索する際に極めて有用である.これは、反復アルゴリズムなどのいくつかのタイプのアルゴリズムにも自然に適しており、1回の反復計算の結果をメモリにキャッシュし、次の反復計算の入力にすることができます.このアルゴリズムは、反復のたびに個別のMapReduce jobであるMapReduceで実装することもできます.したがって、反復のたびに結果はディスクに書き込まれ、次回の反復で読み返さなければなりません.
キャッシュされたRDDは、同じアプリケーションのjobによってのみ取得できます.異なるアプリケーション間でデータセットを共有するには、最初のアプリケーションでsaveAs*()メソッド(saveAsTextFile()、saveAsHadoopFile()など)を使用して外部ストレージに書き込み、2番目のアプリケーションでSparkContextの対応メソッド(textFile()、hadoopFile()など)を使用して再ロードする必要があります.同様に、アプリケーションが終了すると、キャッシュされたすべてのRDDは、明示的に保存されない限り、再アクセスできません.
Persistence levels
Cache()を呼び出すと、RDDの各パーティションがエフェクタのメモリに永続化されます.このRDDパーティションを格納するためにアクチュエータに十分なメモリがない場合、計算は失敗しません.逆に、パーティションは必要に応じて再計算されます.多くのtrsansformationを持つ複雑なプログラムでは、これは高価です.したがってSparkは、persist()を呼び出すときにStorageLevelパラメータを指定すると、ユーザーが選択できる異なるタイプの永続化動作を提供します.
デフォルトの永続化レベルはMEMORY_です.ONLYは、オブジェクトの通常のメモリ表示を使用します.よりコンパクトな表現形式を使用するには、パーティション内の要素をバイト配列(byte array)にシーケンス化します.このレベルはMEMORY_ONLY_SER,MEMORY_よりONLY、このレベルはCPUの圧力を引き起こし、シーケンス化後のRDDパーティションがメモリに適応できるが、従来のメモリが不適切であることを示す場合、この圧力は価値がある.MEMORY_ONLY_SERはまた、各RDDが多くのオブジェクトではなくバイト配列で格納されるため、ゴミ回収の圧力を軽減する.
driverプログラムのログファイルで、BlockManagerに関する情報を確認すると、RDDパーティションがメモリに適していないかどうかを確認できます.さらに、各driverのSparkContextは4040ポートでHTTPサービスを開始し、キャッシュされたRDDパーティションを含む実行環境および実行中のjobに関する有用な情報を提供する.
デフォルトでは、従来のJavaシーケンス化フレームワークを使用してRDDパーティションをシーケンス化していますが、Kryoシーケンス化フレームワーク(次のセクションで説明する)は通常、サイズと速度の両方でより優れた選択です.シーケンス化後のパーティションを圧縮すると、より多くのスペースを節約(CPUの代価をもう一度払う)してsparkを設定することができる.rdd.compress属性はtrueで圧縮を有効にし、属性spark.io.compression.codecはオプションの設定です.
データセットの再計算が非常に高価である場合、MEMORY_AND_DISK(データセットがメモリに入らない場合はディスクに書きます)またはMEMORY_AND_DISK_SER(シーケンス化されたデータセットがメモリに入れられない場合は、ディスクに書き込む)が適切です.
クラスタ内の複数のノードでパーティションをコピーしたり、off-heapメモリを使用したりして、Sparkドキュメントを表示するためのより高度なおよび実験的な永続化レベルもあります.
Serialization
Sparkでは,シーケンス化データとシーケンス化関数(または閉パケット)の2つの側面を考慮する必要がある.
Data
まずデータのシーケンス化を見てみましょう.デフォルトではSparkは、Javaシーケンス化フレームワークを使用してエフェクタ間のネットワーク上でデータを転送するか、シーケンス化してデータをキャッシュします.プログラマーにとって、Javaのシーケンス化はよく理解されています.あなたが使用しているクラスがjavaを実現していることを確認するだけです.io.Serializableインタフェースまたはjava.io.Externalizableインタフェースですが、性能と大きさの観点から、この方式の効率は高くありません.
ほとんどのSparkプログラムでは、Kryoシーケンス化フレームワークがより良い選択です.Kryoは効率的な汎用Javaシーケンス化ライブラリです.Kryoを使用するにはdriverプログラムのSparkConfにsparkを設定.serializerプロパティは次のとおりです.
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kryoはクラスに特定のインタフェースを実装する必要はありません.そのため、簡単なJavaオブジェクトは変更する必要はありません.RDDで使用できます.とはいえ、クラスを使用する前にKryoに登録するとより効率的になります.これは、Kryoがシーケンス化されたオブジェクトのクラス(オブジェクトが参照に対応する)を指す参照を作成し、クラスが登録されている場合、その参照は整数IDであり、クラスが登録されていない場合、その参照はクラスのフルネームであるためです.このブートは自分のクラスにのみ適用され、ScalaクラスやAvro GenericやThriftクラスなどの多くの他のフレームワーククラスはSparkによって登録されています.
Kryoへのクラス登録も簡単です.KryoRegistratorのサブクラスを作成し、registerClasses()メソッドを上書きします.
class CustomKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[WeatherRecord])
}
}
最後にdriverプログラムでは、属性spark.kryo.registratorはあなたのKryoRegistrator実装クラスの完全なクラス名に設定されています.
conf.set("spark.kryo.registrator", "CustomKryoRegistrator")
Functions
通常、関数のシーケンス化は「ちょうど動作する」:Scalaでは、関数はすべてシーケンス化可能であり、標準Javaシーケンス化メカニズムを使用します.これもSparkがリモートエフェクタノードに関数を送信する際に使用する方法です.Sparkは、ローカルモードで実行しても関数をシーケンス化します.シーケンス不可能な関数(例えば、非シーケンス化クラスの方法から変換された関数)を何気なく導入すると、開発プロセスの初期段階で発見されます.
Shared Variables
Sparkプログラムは、RDDの一部ではないデータにアクセスする必要があることが多い.たとえば、次のプログラムはmap()操作でルックアップテーブル(lookup table)を使用します.
val lookup = Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u")
val result = sc.parallelize(Array(2, 1, 3)).map(lookup(_))
assert(result.collect().toSet === Set("a", "e", "i"))
このプログラムは正しく動作します(変数lookupは閉パケットの一部としてシーケンス化されmap()に渡されます)が、ブロードキャスト変数を使用するより効率的な方法があります.
Broadcast Variables
ブロードキャスト変数は、シーケンス化後に各エフェクタに送信され、そこでキャッシュされるため、後続のタスクは必要に応じてアクセスできます.これは通常の変数とは異なります.通常の変数は、閉パケットの一部にシーケンス化され、ネットワーク上で転送され、1つのタスクが1回転送されます.ブロードキャスト変数の役割は,MapReduceの分散キャッシュと似ているが,Spark内部の実装ではメモリにデータを格納し,メモリが消費された場合にのみディスクに書き込まれる.
ブロードキャスト変数の作成方法は,ブロードキャストが必要な変数をSparkContextのbroadcast()メソッドに渡すことである.Tタイプの変数はBroadcast[T]にパッケージされ、次のように返されます.
val lookup: Broadcast[Map[Int, String]] =
sc.broadcast(Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u"))
val result = sc.parallelize(Array(2, 1, 3)).map(lookup.value(_))
assert(result.collect().toSet === Set("a", "e", "i"))
RDDのmap()操作では、このブロードキャスト変数のvalueを呼び出してアクセスする.
その名の通り、ブロードキャスト変数は一方向に伝送され、driverからtaskまで--ブロードキャスト変数を更新してdriverに返信することはできません.そのためには、アキュムレータが必要です.
Accumulators
アキュムレータは共有変数であり、MapReduceのカウンタと同様に、タスクは増加するしかありません.jobが完了すると、アキュムレータの最終値はdriverプログラムで取得できます.次の例では、アキュムレータを使用して整数RDDの要素数を計算し、reduce()操作を使用してRDDの値を合計します.
val count: Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1, 2, 3))
.map(i => { count += 1; i })
.reduce((x, y) => x + y)
assert(count.value === 3)
assert(result === 6)
最初の行のコードはSparkContextのaccumulator()メソッドを使用して、アキュムレータ変数countを作成します.map()操作は一定の関数であり,副作用はcountを増加させる.Spark jobの結果が計算されると,アキュムレータの値はvalueを呼び出すことによってアクセスする.
この例では,アキュムレータとしてIntを用いたが,任意の数値タイプが可能であった.Sparkでは、アキュムレータを使用した結果のタイプが「増分される」タイプとは異なる(SparkContextのaccumulable()メソッドを参照)、可変セットの値を加算できる(accumulableCollection()を参照).