Spark RDDの2つの演算子の1つ:一般的なAction演算子の小結

5532 ワード

**RDD:**弾性分布式データセットは、特殊な集合であり、マルチソースをサポートし、フォールトトレランスメカニズムがあり、キャッシュされ、並列操作をサポートすることができ、1つのRDDは複数のパーティション内のデータセットを代表する.
RDDには2つの演算子がある:1.Transformation(変換):遅延Lazy計算に属し、1つのRDDが別のRDDに変換されたとき、すぐに変換されず、データセットの論理操作を覚えているだけである.2.Action(実行):Sparkジョブの実行をトリガし、本当に変換演算子の計算をトリガする.RDDにおける演算子の実行手順:入力:Sparkプログラムの実行中、データは外部データ空間(例えば分散ストレージ:textFileからHDFSを読み取るなど、parallelizeメソッドはScala集合またはデータを入力する)からSparkを入力し、データはSpark実行時データ空間に入り、Spark中のデータブロックに変換し、BlockManagerで管理する.実行:Sparkデータ入力によりRDDが形成された後、filterなどの変換演算子によりデータを操作し、RDDを新しいRDDに変換し、Action演算子によりSparkコミットジョブをトリガーすることができる.データを多重化する必要がある場合は、Cache演算子を使用してメモリにデータをキャッシュできます.出力:プログラムの実行が終了すると、Spark実行時のスペースが出力され、saveAsTextFileがHDFSに出力されるなどの分散型ストレージに格納されるか、Scalaデータまたはコレクションに格納される(collectがScalaコレクションに出力され、countがScala Int型データに戻る)
一般的なアクション演算子(コードは一番下に統一されています):
  • reduce(func):関数funcによってセット内のすべての要素が集約されます.func関数は2つの同性の要素を受信し,1つの値を返す.この関数は関連性があり、正しく同時実行できることを確認する必要があります.この演算子はreduceByKeyのようにkeyによってグループ化されないので,全量の動作である.
  • collect():Driverのプログラムでは、データセットのすべての要素を配列として返します.ただし、これは小さなデータ・サブセットを返す場合にのみ使用できます.そうしないと、OOM異常が発生しやすくなります.
  • count():データセットの要素数(Longタイプの数)を返します.
  • take(n):データセットの前のn(Intタイプ)要素からなる配列を返します.なお、この操作は複数のノードではなく、Driverが存在するノードで実行される.取得するデータ量が多い場合は、この演算子を使用しないようにすると、Driverが存在するノードの圧力が大きくなります.
  • first():データセットの最初の要素(take(1)に類似)を返します.
  • saveAsTextFile(path):データセットのすべての要素をtextfile形式でローカル、hdfsなどのファイルシステムの指定ディレクトリに保存します.SparkはtoString()メソッドを呼び出し、各要素を1行のテキスト保存に変換します.
  • saveAsSequenceFile(path):データセットのすべての要素をローカル、hdfsなどのファイルシステムで指定されたディレクトリにsequencefile形式で保存します.しかしながら、この方法は、RDDの要素がkey−value対である必要があり、Writableインタフェースまたは暗黙的にWritableに変換できることを実装する(Sparkの基本タイプは、この変換を含む).
  • foreach(func):データセットの各要素で関数funcを実行します.
  • countByKey:reduceByKeyと同じ効果ですが、reduceByKeyはTransformation演算子です.

  • コード実装:
    package com.aura.spark
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ActionTest {
    
      def main(args: Array[String]): Unit = {
    
        //        ,     
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
    
        /**
          *   SparkContext
          *
          * setMaster()    :
          * Master URL  
          *     :Master URL      Spark       
          *       (local):--Spark       (Spark Driver Executor     )
          *       local     :    Spark          
          *       local[M]  :    Spark    M     
          *       local[*]  :    Spark                   
          *       local[M,N]:    Spark    M     ,  Spark           N    
          */
        val conf = new SparkConf()
          .setAppName(s"${ActionTest.getClass.getSimpleName}")
          .setMaster("local[*]")
        val sc = new SparkContext(conf)
        //       
        action_t(sc)
        sc.stop()
      }
    
      def action_t(sc:SparkContext): Unit ={
        val list = List(
          "1,dashi",
          "2,ersha",
          "3,sansha",
          "4,sisha"
        )
        val list_rdd:RDD[String] = sc.parallelize(list)
        val list_map:RDD[(Int,String)] = list_rdd.map(line =>{
          val tup = line.split(",")
          (tup(0).toInt,tup(1))
        })
    
        //reduce
        val list_red:(Int,String) = list_map.reduce((l1,l2) => {
          val k = l1._1 + l2._1
          val v = l1._2 + "_" + l2._2
          (k,v)
        })
        println("----reduce     :")
        println(list_red)
    
        //collect
        //    collect           OOM  ,        ,        filter(func)      
        val list_fil:RDD[(Int,String)] = list_map.filter(_._1 > 2)
        //  ,                 :
        //list_fil.collect().foreach(t => println(t + "\t"))
        val list_col:Array[(Int,String)] = list_fil.collect()
        println("----collect     :")
        list_col.foreach(t => println(t + "\t"))
    
        //count
        val coun:Long = list_map.count()
        println("----count     :")
        println("list_map       :" + coun)
    
        //take
        val list_take:Array[(Int,String)] = list_map.take(2)
        println("----take     :")
        print(" list_map       :")
        list_take.foreach(t => println(t + "\t"))
    
        //first(   take(1))
        val list_fir:(Int, String) = list_map.first()
        println("----first     :")
        print(" list_map       :" + list_fir)
    
        //saveAsTextFile saveAsSequenceFile    , saveAsTextFile  
        /**
          *           :file:///D:/dasha.txt
          *      hdfs-site.xml core-site.xml   ,   hdfs     file:///
          *    ,     D:/dasha.txt
          */
        println("----saveAsTextFile       ")
        //list_map.saveAsTextFile("hdfs://dfs01/data/dasha.txt")
    
        //foreach
        //              ,  : list_take.foreach(t => println(t + "\t"))
        //                     func  
        var count = 1
        val list_for:Unit = list.foreach(line => {
          val words = line.split(",")
          val tup = (words(1),words(0))
          println(" list   " + count + "      :" + tup)
          count += 1
        })
      }
    }
    
    
    実行結果:
    ----reduce     :
    (10,ersha_dashi_sansha_sisha)
    ----collect     :
    (3,sansha)	
    (4,sisha)	
    ----count     :
    list_map       :4
    ----take     :
     list_map       :(1,dashi)	
    (2,ersha)	
    ----first     :
     list_map       :(1,dashi)----saveAsTextFile       
     list   1      :(dashi,1)
     list   2      :(ersha,2)
     list   3      :(sansha,3)
     list   4      :(sisha,4)
    
    クリックして表示:一般的なTransformation演算子のまとめ