Sparkは重複するオブジェクトをフィルタする方法
6449 ワード
データは次のとおりです.
hello world hello spark hello hive hello world hello spark hello hive
最終的に必要なのは
hello world hello spark hello hive
この3つ、繰り返し捨てます.2つの実現方法がある.
第一に、プログラムがラインRddを形成するためにテキストをロードするときにdistinctを呼び出して直接フィルタリングします.
しかし、上記の状況はテストに限られています.実際のデータは2列のデータしかないわけではありません.最も一般的なのは、その中の数列の値に基づいて同じかどうかを判断し、同じであれば重くします.
第二に、データをオブジェクトにカプセル化するときに直接フィルタリングします.これはreduceByKey()を使用します.distinctは数字と文字列の値しかフィルタリングできません.distinctのソースコードもこのように実現されます.まずソースコードを見てみましょう.
具体的なソース解析は、この記事を参照してください.http://blog.csdn.net/u014393917/article/details/50602431
次に、ソースコードの実装方法を参照して、オブジェクトをフィルタします.
コード実装:
エンティティークラス:
実験結果:
(helloworld,2) (hellospark,2) (hellohive,2)
hello|spark hello|world hello|hive
上のプロセスはrddで繰り返されるオブジェクトをどのようにフィルタリングするかというプロセスですが、Stringと私がカスタマイズしたオブジェクトはすべてオブジェクトタイプで、なぜ文字列は直接distinctを呼び出して重くすることができて、私がカスタマイズするのは確かにだめですか?最初はhashCodeメソッドだと思って書き直していませんでしたが、書き直してからはまだ重くなりませんでした.しかし、カスタムオブジェクトでもいいと思います.どこか漏れているかもしれません.
hello world hello spark hello hive hello world hello spark hello hive
最終的に必要なのは
hello world hello spark hello hive
この3つ、繰り返し捨てます.2つの実現方法がある.
第一に、プログラムがラインRddを形成するためにテキストをロードするときにdistinctを呼び出して直接フィルタリングします.
lineRdd.distinct()
しかし、上記の状況はテストに限られています.実際のデータは2列のデータしかないわけではありません.最も一般的なのは、その中の数列の値に基づいて同じかどうかを判断し、同じであれば重くします.
第二に、データをオブジェクトにカプセル化するときに直接フィルタリングします.これはreduceByKey()を使用します.distinctは数字と文字列の値しかフィルタリングできません.distinctのソースコードもこのように実現されます.まずソースコードを見てみましょう.
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
具体的なソース解析は、この記事を参照してください.http://blog.csdn.net/u014393917/article/details/50602431
次に、ソースコードの実装方法を参照して、オブジェクトをフィルタします.
コード実装:
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 。
conf.registerKryoClasses(Array(classOf[StringPojo]))
val context = new SparkContext(conf)
val lineRdd = context.textFile("E:\\sourceData\\maptest.txt")
// lineRdd.distinct()
val wordsRdd=lineRdd.map{line =>
val words=line.split("\t")
new StringPojo(words(0),words(1))
}
val pairRdd1= wordsRdd.map(pojo=>(pojo.name+pojo.secondName,1))
pairRdd1.reduceByKey(_+_).foreach(println)
val pairRdd= wordsRdd.map(pojo=>(pojo.name+pojo.secondName,pojo))
pairRdd.reduceByKey((x,y)=>x).map(_._2).foreach(println)
// pairRdd.distinct().foreach(println)
// val distRdd=wordsRdd.distinct()
// distRdd.foreach(println)
}
エンティティークラス:
class StringPojo(val name:String,val secondName:String) {
override def toString: String = {
super.toString
this.name + "|" + this.secondName
}
override def hashCode(): Int = {
// println("name:"+ this.secondName+",hashCode:"+ this.secondName.hashCode)
this.secondName.hashCode
}
}
実験結果:
(helloworld,2) (hellospark,2) (hellohive,2)
hello|spark hello|world hello|hive
上のプロセスはrddで繰り返されるオブジェクトをどのようにフィルタリングするかというプロセスですが、Stringと私がカスタマイズしたオブジェクトはすべてオブジェクトタイプで、なぜ文字列は直接distinctを呼び出して重くすることができて、私がカスタマイズするのは確かにだめですか?最初はhashCodeメソッドだと思って書き直していませんでしたが、書き直してからはまだ重くなりませんでした.しかし、カスタムオブジェクトでもいいと思います.どこか漏れているかもしれません.