Spark Streamingにおけるtransform演算子の使用の詳細

6422 ワード

実際の生産では、DStreamとRDDの間で相互操作が必要なシーン(最も典型的なのはブラックリストのシーン)があります.この場合、transform演算子を使用して公式サイトの紹介を実現する必要があります.http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
具体的なコードは次のように表示されます.
/**
  * transform  
  *
  *       ,                ==> RDD
  *                      Kafka、        ==> DStream
  */
object StreamingTransformApp {
  def main(args: Array[String]): Unit = {
    val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 5)
    val lines = ssc.socketTextStream("localhost", 9999)

    //      
    val blacks = new ListBuffer[(String, Boolean)]()
    blacks.append(("huhu", true))
    val blacksRDD = ssc.sparkContext.parallelize(blacks)

    //          ,             
    lines.map(x => {(x.split(",")(0), x)})
      .transform(rdd => {
        rdd.leftOuterJoin(blacksRDD)
          .filter(_._2._2.getOrElse(false) != true)
          .map(x => x._2._1)
      }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

データソースを入力:
crazybird:conf $ nc -lk 9999
huhu,1
abc,2
cde,2

ブラックリストのデータのフィルタリングに成功しました:Spark Streaming中transform算子使用详解_第1张图片