Spark修練の道(進級編)——Spark入門から精通へ:第十一節Spark Streaming——DStream Transformation操作

6838 ワード

本節の主な内容
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html
  • DStream Transformationオペレーション
  • 1.Transformation操作
    Transformation
    Meaning
    map(func)
    DStreamの各要素に対してfunc関数操作を行い、新しいDStreamを返す.
    flatMap(func)
    mapメソッドと同様ですが、各入力項目はゼロまたは複数の出力項目として出力できます.
    filter(func)
    すべての関数func戻り値trueのDStream要素をフィルタし、新しいDStreamを返します.
    repartition(numPartitions)
    DStreamのパーティション数を増やしたり減らしたりして、DStreamの並列度を変更します.
    union(otherStream)
    ソースDStreamと入力パラメータがotherDStreamの要素を結合し、新しいDStreamを返します.
    count()
    DStreaimの各RDDの要素をカウントし、1つの要素のみのRDDからなるDStreamを返す
    reduce(func)
    ソースDStream中の各RDD中の要素をfuncで集約操作し、1つの要素のみのRDDからなる新しいDStreamを返す.
    countByValue()
    要素タイプKのDStreamについて、(K,Long)キー値ペア形式の新しいDStreamを返し、Longに対応する値はソースDStreamの各RDDのkeyが現れる回数である
    reduceByKey(func, [numTasks])
    func関数を使用してソースDStreamのkeyを集約し、新しい(K,V)対で構成されたDStreamを返します.
    join(otherStream, [numTasks])
    (K,V)、(K,W)タイプのDStreamと入力し、新しい(K,(V,W)タイプのDStreamを返します.
    cogroup(otherStream, [numTasks])
    (K,V)、(K,W)タイプのDStreamと入力し、新しい(K,Seq[V],Seq[W])メタグループタイプのDStreamを返します.
    transform(func)
    RDD-to-RDD関数がソースコードDStream内の各RDDに作用することによって、任意のRDD動作であることができ、それによって新しいRDDが返される
    updateStateByKey(func)
    キーの前置状態とキーの新しい値に基づいてキーを更新し、新しい状態のDStreamを返します.
    具体例:
        //      ~/streaming   
        val lines = ssc.textFileStream(args(0))
        val words = lines.flatMap(_.split(" "))
        val wordMap = words.map(x => (x, 1))
        val wordCounts=wordMap.reduceByKey(_ + _)
        val filteredWordCounts=wordCounts.filter(_._2>1)
        val numOfCount=filteredWordCounts.count()
        val countByValue=words.countByValue()
        val union=words.union(word1)
        val transform=words.transform(x=>x.map(x=>(x,1)))
        //     
        lines.print()
        //  flatMap  
        words.print()
        //  map  
        wordMap.print()
        //  reduceByKey  
        wordCounts.print()
        //  filter  
        filteredWordCounts.print()
        //  count  
        numOfCount.print()
        //  countByValue  
        countByValue.print()
        //  union  
        union.print()
        //  transform  
        transform.print()
  •  

  • 次のコードは、実行時に追加されたファイルの内容です.
    root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt
    
  •  

  • 次に、前の各関数の結果を示します.
    -------------------------------------------
    lines.print()
    -------------------------------------------
    A B C D
    A B
    
    -------------------------------------------
    flatMap  
    -------------------------------------------
    A
    B
    C
    D
    A
    B
    
    -------------------------------------------
    map  
    -------------------------------------------
    (A,1)
    (B,1)
    (C,1)
    (D,1)
    (A,1)
    (B,1)
    
    -------------------------------------------
    reduceByKey  
    -------------------------------------------
    (B,2)
    (D,1)
    (A,2)
    (C,1)
    
    
    -------------------------------------------
    filter  
    -------------------------------------------
    (B,2)
    (A,2)
    
    -------------------------------------------
    count  
    -------------------------------------------
    2
    
    -------------------------------------------
    countByValue  
    -------------------------------------------
    (B,2)
    (D,1)
    (A,2)
    (C,1)
    
    -------------------------------------------
    union  
    -------------------------------------------
    A
    B
    C
    D
    A
    B
    A
    B
    C
    D
    ...
    
    -------------------------------------------
    transform  
    -------------------------------------------
    (A,1)
    (B,1)
    (C,1)
    (D,1)
    (A,1)
    (B,1)
  •  

  • 例2:前回のレッスンで示したWordCountコードは、入力した単語を別々にカウントするだけでなく、前回のカウントの状態は記録されていません.連続的にカウントしたい場合は、updateStateByKeyメソッドを使用して行います.次のコードは主にupdateStateByKeyの方法を示しています.
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    import org.apache.spark.streaming._
    
    object StatefulNetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: StatefulNetworkWordCount  ")
          System.exit(1)
        }
    
       //     ,                   
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          val currentCount = values.sum
    
          val previousCount = state.getOrElse(0)
    
          Some(currentCount + previousCount)
        }
    
         //     K,V,S,      K,S
         //V        ,S       
        val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
          iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
        }
    
        val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]")
    
        //       
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        //     checkpoint    ,    checkpoint Spark Streaming    
        ssc.checkpoint(".")
    
        //RDD      
        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    
    
        //  Socket     ,  ip localhost,   9999
        val lines = ssc.socketTextStream(args(0), args(1).toInt)
        //flatMap  
        val words = lines.flatMap(_.split(" "))
        //map  
        val wordDstream = words.map(x => (x, 1))
    
        //updateStateByKey    
        val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
          new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
        stateDstream.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  •  

  • 次の図は初期値です:Spark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作_第1张图片次のコマンドを使用してnetcat serverを起動します.
    root@sparkmaster:~/streaming# nc -lk 9999
    
  •  

  • そして入力
    root@sparkmaster:~/streaming# nc -lk 9999
    hello
    
  •  

  • 下図の結果が得られます
    Spark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作_第2张图片
    そしてworldを入力し、
    root@sparkmaster:~/streaming# nc -lk 9999
    hello
    world
    
  •  

  • 次の結果が得られますSpark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作_第3张图片