Sparkプログラミングガイド(三)

5898 ワード

  • 共有変数
  • ブロードキャスト変数
  • アキュムレータ
  • は、クラスタ
  • に配備される
  • JavaScalaからSparkジョブを実行する
  • ユニットテスト
  • 共有変数
    一般に、関数は、リモートクラスタノード上で実行されるSpark動作(例えば、mapまたはreduce)に渡されるとき、すべての変数の独立したコピーを使用する.これらの変数は各マシンにコピーされ、リモートマシンの変数はドライバに更新されません.タスク間で共有変数を読み書きするのは無効です.Sparkは2つの一般的な使用パターンに2つのタイプの共有変数を提供する:ブロードキャスト変数とアキュムレータ.
    ブロードキャスト変数
    ブロードキャスト変数を使用すると、開発者は、変数のコピーを渡すのではなく、読み取り専用変数を各マシンにキャッシュすることができます.ブロードキャスト変数は、各ノードに大きな入力データセットのコピーを効率的に与えるために使用することができる.Sparkも,通信コストを削減するために効率的なブロードキャストアルゴリズムを用いてブロードキャスト変数を配布することを試みている.
    Spark actionsは、一連のフェーズで実行され、「shuffle」操作で区切られます.Sparkは、各フェーズでタスクに必要な共通データを自動的にブロードキャストします.このようにブロードキャストされたデータはシーケンス化されてキャッシュされ、各タスクが実行される前に逆シーケンス化されます.すなわち、ブロードキャスト変数は、フェーズ間タスクが同じデータを必要とする場合、または逆シーケンス化された形式でデータをキャッシュすることが重要である場合にのみ、明示的に作成される.
    ブロードキャスト変数は、呼び出しSparkContext.broadcast(v)によって変数vから作成することができる.ブロードキャスト変数はvのwrapperであり、valueメソッドでアクセス可能である.コードは次のとおりです.
    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
    
    scala> broadcastVar.value
    res0: Array[Int] = Array(1, 2, 3)

    ブロードキャスト変数を作成した後、vがノードに複数回送信されないように、クラスタ上で関数を実行しないでください.また、オブジェクトvは、ブロードキャスト後に変更されず、すべてのノードが同じ値を取得することを保証することができる.
    アキュムレータ
    アキュムレータは、関連付けおよび交換操作によってアキュムレータされる変数のみであり、並列を効果的にサポートします.アキュムレータはcounters(例えばMapReduce)およびsumsを実装するために使用することができる.Sparkは数値タイプのアキュムレータをサポートし,開発者は新しいタイプのサポートを追加することができる.
    ユーザーとして、名前付きまたは名前なしのアキュムレータを作成できます.次の図に示すように、名前付きアキュムレータ(v)は、アキュムレータを変更する段階でweb UIに表示されます.
    UIでアキュムレータを追跡することは、実行フェーズのプロセスを理解するのに役立ちます.(注意:現在Pythonではサポートされていません).
    数値アキュムレータは、それぞれLongまたはDoubleタイプの値をアキュムレータするために、counterまたはSparkContext.longAccumulator()を呼び出すことによって作成することができる.クラスタ上で実行されたタスクの後、SparkContext.doubleAccumulator()メソッドを使用して累積動作を行うことができる.ただし、これらのタスクはアキュムレータの値を読み取ることはできず、ドライバのみがaddの方法でアキュムレータの値を読み取ることができる.
    次に、アキュムレータが配列を累積することを示します.
    scala> val accum = sc.longAccumulator("My Accumulator")
    accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
    
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    ...
    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    scala> accum.value
    res2: Long = 10

    上記のコードには、サポートされているLongタイプアキュムレータが内蔵されており、開発者はAccumulatorV 2で独自のタイプを作成することもできます.AccumulatorV 2抽象クラスは、複数のメソッドによって書き換えられる.ここで書き換えなければならないのは、0valueのチャージアキュムレータに値を加算するためのreset、現在のアキュムレータに別の同じタイプのアキュムレータをマージするためのaddであり、その他の書き換えなければならないメソッドはAPI documentationを参照する.たとえば、数学ベクトルを表すmergeのクラスがあるとします.
    class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
    
      private val myVector: MyVector = MyVector.createZeroVector
    
      def reset(): Unit = {
        myVector.reset()
      }
    
      def add(v: MyVector): Unit = {
        myVector.add(v)
      }
      ...
    }
    
    // Then, create an Accumulator of this type:
    val myVectorAcc = new VectorAccumulatorV2
    // Then, register it into spark context:
    sc.register(myVectorAcc, "MyVectorAcc1")

    開発者が独自のタイプのAccumulatorV 2を定義した場合、結果タイプは要素を追加するタイプとは異なる場合があることに注意してください.
    Actionsでのみ更新操作を実行するアキュムレータの場合、Sparkは、タスクがアキュムレータの更新操作に一度だけ適用されることを保証します.たとえば、タスクを再起動してもアキュムレータの値は更新されません.transformationsでは、ユーザーは、タスクまたはジョブフェーズが再実行されると、各タスクの更新操作が複数回適用されることを認識する必要があります.
    アキュムレータはSparkの怠け者計算モデルを変更せず,アキュムレータがRDDの動作で更新されると,アキュムレータの値はRDDがactionの一部として計算されたときにのみ更新される.従って、map()のような怠け者transformationでは、アキュムレータの更新が実行されることは保証されない.次のコードは、この特性を説明します.
    val accum = sc.longAccumulator
    data.map { x => accum.add(x); x }
    // Here, accum is still 0 because no actions have caused the map operation to be computed.

    クラスタへの配備
    アプリケーションsubmission guideでは、クラスタへのアプリケーションのコミット方法について説明しています.簡単に言えば、アプリケーションをパッケージ化すると(JavaおよびScalaがJARにパッケージ化され、Pythonが.pyまたは.zipのセットにパッケージ化されます)、bin/spark-submitスクリプトは、サポートされているクラスタマネージャにパッケージをコミットできます.
    Java/ScalaからSparkジョブを実行する
    org.apache.spark.Launcherパッケージは、簡単なJava APIを使用して、Sparkジョブをサブプロセスとして実行するクラスを提供します.
    ユニットテスト
    Sparkは任意のポピュラーなユニットテストフレームワークを使用することができる.マスターURLをMyVectorに設定し、テスト中にlocalを簡単に作成し、操作を実行し、SparkContextを呼び出して終了します.SparkContext.stop()ブロックまたはテストフレームワークのfinallyメソッドのstop contextで、Sparkが同じプログラムで2つのコンテキストを同時に実行することをサポートしていないことを確認します.