ゼロからSpark(三)RDDプログラミングを学ぶ

4871 ワード

RDDプログラミング
RDD(Resilient Distributed Datasetフレックス分散データセット)はSparkの中で最も核心的な概念である.RDDをマスターしてSparkを理解して、この章はいくつかの最も簡単な例を通じてRDDのScalaプログラミングを実現します.RDDのコア操作は3つあり,作成,変換操作,アクション操作である.
1.概要
以下の最も簡単な例はRDDの一連の典型的な動作を表す.以下はPythonコードで、Scalaも基本的に同じです.
lines = sc.textFile("README.md") //  
pythonLines = lines.filter(lambda line: "Python" in line) //    
pythonLines.first() //    
  • 第1行はRDD
  • を作成した.
  • の2行目は、RDDに対して変換動作を実行し、まだRDDを返している.SparkはこれらのRDDを不活性に計算するだけであることに注意してください.プログラムはあなたの操作を記録しただけですが、実際に実行するわけではありません.ファイルは読み込まれていません.filter操作は実行されていません.RDDの変換動作は、map()およびfilter()のような新しいRDDを返す動作である.
  • の3行目は行動操作に対応しており、このときに実際にファイルを読み取り、操作を実行する.アクション操作は、ドライバプログラムに結果を返すか、外部システムに結果を書き込む操作であり、count()やfirst()などの実際の計算がトリガーされます.また、Int、Array、Stringなどのデータ型にも似ています.

  • 2.RDDの作成
    第一種外部データセットの読み込み
    val lines = sc.textFile("README.md")
    

    第二種既存の集合を利用する
    RDDを作成する最も簡単な方法は、プログラムの既存のセットをSparkContextのparallelize()メソッドに渡すことです.
    プロトタイプの開発とテストを除いて、この方法はあまり使われていません.結局、この方法はデータセット全体を1台のマシンのメモリに置く必要があります.
    val lines = sc.parallelize(List("pandas", "i like pandas"))
    

    3.変換操作
    各要素に対する変換操作
  • map()は、RDDの各要素に使用される関数を受信し、関数の戻り結果を結果として使用します.
  •  val input = sc.parallelize(List(1, 2, 3, 4))
     val result = input.map(x => x * x)
    

    文字列RDDがあり、map()関数が文字列を解析してDouble値を返すために使用されている場合、入力RDDタイプはRDD[String]であり、出力タイプはRDD[Double]である.
  • filter()は、関数を受信し、RDDの関数を満たす要素を新しいRDDに入れて返します.伝達された関数の戻り値はブール型でなければなりません.
  • newlines = lines.filter(line => line.contains("error")) 
    
  • flatMap()の関数は、入力RDDの各要素にそれぞれ適用される.ただし、返されるのは要素ではなく、値シーケンスを返す反復器です.出力されたRDDは反復器で構成されていない.各反復器がアクセス可能なすべての要素を含むRDDを得た.
  • val lines = sc.parallelize(List("hello world", "hi")) 
    val words = lines.flatMap(line => line.split(" ")) 
    words.first() //   "hello"
    
  • distinct()操作は重複元素を除去することができるが、これはデータの混練に関連し、効率が非常に低下する.

  • 擬似集合操作、2つのRDD操作
    次の4つの方法は,並列,交差,差分,デカルト積をそれぞれ実現した.
    val a = sc.parallelize(List(1, 2, 3))
    val b = sc.parallelize(List(3, 4, 5))
    
  • union(other)は、2つのRDDのすべての要素を含むRDDを返すコレクション操作です.unionは重くならない.
  • intersection(other)メソッドは、2つのRDDに存在する要素のみを返します.注意intersection()は、データの混洗にも関与し、効率が非常に低下します.
  • subtract(other)関数は、パラメータとして別のRDDを受信し、第1のRDDにのみ存在し、第2のRDDには存在しないすべての要素からなるRDDを返す.
  • cartesian(other)変換操作は、可能なすべての(a,b)対
  • を返す.
    a.cartesian(b) //{(1, 3), (1, 4), ... (3, 5)},       RDD 
    

    4.行動操作
  • collect()関数は、RDD全体のデータを取得するために使用することができる.collect()は、データセット全体を単一のマシンのメモリに格納できる場合にのみ使用できます.そのため、collect()は大規模なデータセットでは使用できません.
  • reduce()関数は、2つのRDDの要素タイプのデータを操作し、同じタイプの新しい要素を返す関数をパラメータとして受信します.
  • val sum = rdd.reduce((x, y) => x + y) //         
    
  • fold()はreduce()と同様に、reduce()が受信した関数署名と同じ関数を受信し、各パーティションが最初に呼び出されたときの結果として「初期値」を追加します.
  • rdd.fold(0)((x, y) => x + y)
    
  • aggregate()関数は、戻り値タイプが動作するRDDタイプと同じでなければならないという制限から解放されます.たとえば、平均値を計算する場合は、ループ中のカウントと要素の数を記録する必要があります.これには、二元グループを返す必要があります.具体的な使い方についてはこちらをご覧ください.
  • val result = input.aggregate((0, 0))(
                         (acc, value) => (acc._1 + value, acc._2 + 1),
                         (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
    val avg = result._1 / result._2.toDouble
    
  • take(n)は、RDD内のn要素を返し、できるだけ少ないパーティションにのみアクセスしようとするので、この操作は不均衡な集合を得る.
  • top()は、RDDから最初の要素を取得する.
  • foreach()アクションは、RDDをローカルに戻す必要がなく、RDDの各要素を動作させる.foreachは値を返していません.
  • line.foreach(println)
    
    8.              RDD,  mean() variance()      RDD ,  join()       RDD 。
    
    ## 5. persist(  )
    
    Spark RDD      ,               RDD。      RDD      ,Spark       RDD        。            ,                  。  persist         ,            。
    
    ```scala
      val result = input.map(x => x * x)
      result.persist(StorageLevel.DISK_ONLY)
      println(result.count())
      println(result.collect().mkString(","))
    

    6.注意すべき点
    オブジェクトがオブジェクトのメンバーであるか、オブジェクトのフィールドへの参照が含まれている場合(self.fieldなど)、Sparkはオブジェクト全体をワークノードに送信します.これは、あなたが伝えたいものよりもずっと大きい可能性があります.
    python正しい書き方
    class WordFunctions(object):
            ...
      def getMatchesNoReference(self, rdd):
      #   :                query = self.query
      return rdd.filter(lambda x: query in x)
    

    Scala正しい書き方
    def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
      //   :                    val query_ = this.query
      rdd.map(x => x.split(query_))
    }
    

    7.SparkとHadoopの違い
    Hadoop MapReduceのようなシステムでは、開発者は、MapReduceのサイクル数を減らすために、操作を組み合わせる方法を考えるのに多くの時間を費やすことが多い.Sparkでは,非常に複雑なマッピングを書くことは,多くの簡単な連続動作を用いるよりも多くの性能を得ることができるとは限らない.根本的な原因はSparkの変換操作が不活性操作であることである.