Spark Streamingにおけるtransform演算子の使用の詳細
6422 ワード
実際の生産では、DStreamとRDDの間で相互操作が必要なシーン(最も典型的なのはブラックリストのシーン)があります.この場合、transform演算子を使用して公式サイトの紹介を実現する必要があります.http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
具体的なコードは次のように表示されます.
データソースを入力:
ブラックリストのデータのフィルタリングに成功しました:
具体的なコードは次のように表示されます.
/**
* transform
*
* , ==> RDD
* Kafka、 ==> DStream
*/
object StreamingTransformApp {
def main(args: Array[String]): Unit = {
val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 5)
val lines = ssc.socketTextStream("localhost", 9999)
//
val blacks = new ListBuffer[(String, Boolean)]()
blacks.append(("huhu", true))
val blacksRDD = ssc.sparkContext.parallelize(blacks)
// ,
lines.map(x => {(x.split(",")(0), x)})
.transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(_._2._2.getOrElse(false) != true)
.map(x => x._2._1)
}).print()
ssc.start()
ssc.awaitTermination()
}
}
データソースを入力:
crazybird:conf $ nc -lk 9999
huhu,1
abc,2
cde,2
ブラックリストのデータのフィルタリングに成功しました: