Spark演算子:RDDAction操作–first/count/reduce/collect/collectAsMap
5249 ワード
first
def first():T firstはRDDの最初の要素を返し、ソートしません.
count
def count():Long countはRDDの要素の数を返します.
reduce
def reduce(f:(T,T)⇒T):Tはマッピング関数fに基づいてRDD中の要素を二元計算し、計算結果を返す.
collect
def collect(): Array[T]
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
collectは、RDDを配列に変換するために使用されます.
def first():T firstはRDDの最初の要素を返し、ソートしません.
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
scala> rdd1.first
res14: (String, String) = (A,1)
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1.first
res8: Int = 10
count
def count():Long countはRDDの要素の数を返します.
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd1.count
res15: Long = 3
reduce
def reduce(f:(T,T)⇒T):Tはマッピング関数fに基づいてRDD中の要素を二元計算し、計算結果を返す.
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.reduce(_ + _)
res18: Int = 55
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd2.reduce((x,y) => {
| (x._1 + y._1,x._2 + y._2)
| })
res21: (String, Int) = (CBBAA,6)
collect
def collect(): Array[T]
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
collectは、RDDを配列に変換するために使用されます.
scala> var rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21 scala> rdd1.collect res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val one: PartialFunction[Int, String] = { case 1 => "one"; case _ => "other"} one: PartialFunction[Int,String] =
scala> val data = sc.parallelize(List(2,3,1)) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at :12 scala> data.collect(one).collect res4: Array[String] = Array(other, other, one)
collectAsMap
def collectAsMap(): Map[K, V]
scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good"))) data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at
:12 scala> data.collectAsMap res28: scala.collection.Map[Int,String] = Map(2 -> com, 1 -> com, 3 -> good)