Sparkラーニング-キャッシュ、クローズ、共有変数

4748 ワード

一、キャッシュ
Sparkにもキャッシュメカニズム,あるいは永続化メカニズムがある.RDDの変換はすべて不活性であるため、これはaction操作を呼び出す前にSparkが計算しないことを意味する.
はい、Sparkは内部に要求された実行ステップのすべての流れを記録し、有向無ループ図(DAG)を構築します.同様にRDDにデータを読み込む動作も不活性である
のです.この特性のため、同じRDDを複数回使用する必要がある場合があり、単純にRDDに対してaction操作を呼び出すと、SparkはRDDとそのすべてを毎回再計算する
依存して、このように消耗します.キャッシュを使用するとaction計算速度を速めることができます(通常は10倍加速します)
したがってSparkには永続化の動作があり、SparkにRDDを永続化して格納すると、計算されたRDDノードは、求められたRDDパーティション数をそれぞれ保存します.
根拠.永続化されたデータがあるノードで障害が発生した場合、Sparkは、データのキャッシュが必要なときに失われたデータパーティションを再計算します.このような状況を回避するために、複数のノードにデータをバックアップできます.
RDDはpersist()法またはcache()法を用いて永続化することができる.データは、最初のaction操作時に計算され、ノードのメモリにキャッシュされます.Cache()メソッド本質呼び出しはMEMORY_ONLYレベルのpersist()メソッドですが、persist()メソッドでは、ストレージレベルを選択できます.
MEMORY_ONLY:RDDは、JVMに逆シーケンス化されたJavaオブジェクトとして格納されます.メモリ容量が足りない場合、一部のデータパーティションはキャッシュされず、これらのデータが必要になるたびに再計算されます.これはデフォルトのレベルです.MEMORY_AND_DISK:RDDをJavaオブジェクトを逆シーケンス化してJVMに格納します.メモリ容量が足りない場合は、キャッシュされていないデータパーティションをディスクに格納し、これらのパーティションを使用する必要がある場合にディスクから読み出します.MEMORY_ONLY_SER:RDDをシーケンス化されたJavaオブジェクトとして格納します(各パーティションはbyte配列).この方法は、オブジェクトを逆シーケンス化する方法よりも多くのスペースを節約し、特にfast serializerを使用する場合はより多くのスペースを節約しますが、CPUの計算負担は読み取り時に増加します.MEMORY_AND_DISK_SER:MEMORY_に類似ONLY_SERですが、オーバーフローしたパーティションはディスクに格納され、使用時に再計算されるわけではありません.DISK_ONLY:ディスクにのみRDDをキャッシュします.MEMORY_ONLY_2,MEMORY_AND_DISK_2など:上記のレベルの機能と同じですが、各パーティションはクラスタ内の2つのノードでコピーを確立します.OFF_HEAP(実験中):MEMORY_に類似ONLY_SERですが、off-heap memoryにデータを格納し、off-heapメモリを起動する必要があります.
では、ストレージ・レベルをどのように選択すればいいのでしょうか.コアな問題は、メモリ使用率とCPU効率のバランスです.ストレージ・レベルの選択は、次の手順で行うことを推奨します. 
  • デフォルトの記憶レベル(MEMORY_ONLY)を使用してメモリに記憶されているRDDがオーバーフローしていない場合は、デフォルトの記憶レベルを選択します.デフォルトのストレージレベルは、CPUの効率を最大化し、RDD上での動作を最速の速度で実行できます.
  • メモリがRDDをすべて記憶できない場合は、MEMORY_を使用します.ONLY_SERは、高速シーケンス化ライブラリを選択してオブジェクトをシーケンス化し、メモリ領域を節約します.このストレージ・レベルを使用すると、計算速度は依然として速くなります.
  • は、このデータセットを計算するコストが特に高い場合、または大量のデータをフィルタリングする必要がある場合を除き、オーバーフローしたデータをできるだけディスクに格納しないでください.なぜなら、このデータパーティションの再計算にかかる時間と、ディスクからこれらのデータを読み出す時間との差は少ないからです.
  • 障害を迅速に復元するには、マルチコピー・ストレージ・レベル(例えば、Webアプリケーションのバックグラウンド・サービスとしてSparkを使用し、障害が発生した場合に迅速なリカバリが必要なシーン)を使用することをお勧めします.すべてのストレージ・レベルは、失われたデータを再計算することによって、完全なフォールト・トレランス・メカニズムを提供します.ただし、マルチコピー・レベルでは、データ損失が発生した場合、対応するデータベースを再計算する必要がなく、タスクを続行できます.

  • Sparkは、各ノードのキャッシュ使用率を自動的に監視し、最も最近使用されていない方法(LRU)で古いデータブロックをメモリから削除します.手動で削除する場合は 
    RDD,そして
    このRDDがSparkによって自動的に除去されるのを待つのではなく、RDD.unpersist()メソッドを使用することができる.
    二、閉パッケージと共有変数
    次のシナリオを見てください.
            List nums = Arrays.asList(1,3,2,4,3,5,4,6,5,4);
            final int[] count = {0};
            JavaRDD rawraw = sc.parallelize(nums);
            rawraw.foreach(a -> count[0] = count[0] +a);

    上記のコードの動作は不確定であり、予想通りに正常に動作しない可能性があります.Sparkがジョブを実行すると,RDD操作が各実行者に分解される.実行する前に、
    Spark計算タスクのclosure(閉パッケージ).一方、閉パケットは、RDD上の実行者がアクセス可能でなければならない変数および方法(この場合のforeach()である).クローズド布団
    シーケンス化され、各エフェクタに送信されます.
    閉パケットの変数のコピーは各executorに渡され、counterがforeach関数に参照されるとdriver nodeのcounterではなくなります.driver nodeにはメモリにcounterが残っていますが、executorsには表示されません.executorが見たのは、シーケンス化された閉パッケージのコピーだけです.したがってcounterの最終的な値は0です.counterのすべての操作に対してすべての操作がシーケンス化されたclosure内の値を参照するためです.
    そのため、この場合、共有変数を使用するべきです!
    1、放送変数
    Broadcast variables(ブロードキャスト変数)を使用すると、プログラマはタスクにコピーを渡すのではなく、read-only(読み取り専用)変数を各マシンにキャッシュできます.これらはどのように使用されるのか、例えば、ブロードキャスト変数は、各ノードに比較的大きなinput dataset(入力データセット)コピーを効率的に渡すことができる.ブロードキャスト変数を使用する場合、Sparkはまた、broadcast variables(ブロードキャスト変数)を効率的なブロードキャストアルゴリズムを使用して配布し、通信コストを削減しようと試みる.Sparkのaction(アクション)操作は一連のステージ(フェーズ)によって実行され,これらのステージ(フェーズ)は分散型の「shuffle」操作によって分割される.Sparkは各ステージ(フェーズ)内のタスクに必要な共通データを自動的にブロードキャストします.この場合、ブロードキャストされたデータはシーケンス化された形式でキャッシュされ、各タスクが実行される前に逆シーケンス化される.これは、複数のステージ(フェーズ)にまたがる複数のタスクで同じデータが使用される場合、または逆シーケンス化形式のデータが特に重要な場合にのみ、ブロードキャスト変数が使用されると良い効果が得られることを意味する.
    Broadcast broadcastVar = sc.broadcast(new int[] {1, 2, 3});
    
    broadcastVar.value();
    // returns [1, 2, 3]
    

    2、アキュムレータ
    Accumulators(アキュムレータ)は、「added」(追加)のみを実行できる変数であり、関連付けおよび交換操作を行うため、サポートパラレルを効率的に実行できます.アキュムレータはcounter(MapReduceのようなカウント)またはsums(加算)を実現するために使用することができる.オリジナルSparkは数値型のアキュムレータをサポートし,プログラマは新しいサポートタイプを追加することができる.
    LongAccumulator accum = jsc.sc().longAccumulator();
    
    sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
    // ...
    // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    accum.value();
    // returns 10

    アキュムレータの更新はaction操作でのみ行われ、Sparkは各タスクがアキュムレータを1回だけ更新することを保証します.たとえば、タスクを再起動しても値は更新されません.transformations(変換)では、task(タスク)またはjob stages(フェーズ)が再実行されると、各タスクの更新操作が複数回実行される可能性があることに注意してください.アキュムレータはSpark lazy evaluationのモードを変更しません.アキュムレータがRDDの1つの動作で更新される場合、それらの値は1回のみ更新され、RDDはactionの一部として計算される.従って、map()のようなtransformation(変換)では、アキュムレータの更新は実行されない.次のコードクリップは、この特性を証明します.