Spark RDD操作練習1
6878 ワード
scala> sc
res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40283584
scala> val rdd1 = sc.parallelize
def parallelize[T](seq: Seq[T],numSlices: Int)(implicit evidence$1: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]
scala> val rdd1 = sc.parallelize(List(5,4,6,6,7,3,1,9))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
scala> val rdd2 = rdd1.map(_ *2).sortBy(x =>x, true)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at :26
scala> val rdd3 = rdd2.filter(_ >= 10)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at :28
scala> rdd3.collect
res2: Array[Int] = Array(10, 12, 12, 14, 18)
scala> rdd1.collect
res4: Array[Int] = Array(5, 4, 6, 6, 7, 3, 1, 9)
scala> rdd2.collect
res5: Array[Int] = Array(2, 6, 8, 10, 12, 12, 14, 18)
scala>
scala> val rdd1= sc.parallelize(Array("a b c", "d f e", "h i"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at :24
scala> rdd1.collect
res6: Array[String] = Array(a b c, d f e, h i)
scala> val rdd2 = rdd1.flatMap(_.split(' '))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at :26
scala> rdd2.collect
res7: Array[String] = Array(a, b, c, d, f, e, h, i)
scala> val rdd3 = rdd1.flatMap(_.split(" "))
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at :26
scala> rdd3.collect
res9: Array[String] = Array(a, b, c, d, f, e, h, i)
scala>
scala> val rdd1 = sc.parallelize(List(5, 6, 4, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :24
scala> val rdd2 = sc.parallelize(List(1, 2, 4, 5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at :24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[14] at union at :28
scala> rdd3.collect
res12: Array[Int] = Array(5, 6, 4, 3, 1, 2, 4, 5)
scala> val rdd4 = rdd1.intersection(rdd2)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at :28
scala> rdd4.collect
res13: Array[Int] = Array(4, 5)
scala> rdd3.distinct
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at distinct at :31
scala> res14.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala>
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at :24
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at :24
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[28] at join at :28
scala> rdd3.collect
res16: Array[(String, (Int, Int))] = Array((tom,(1,2)), (jerry,(3,2)))
scala> val rdd4 = rdd1 union rdd2
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[29] at union at :28
scala> rdd4.collect
res17: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,2))
scala> rdd4.groupByKey
res18: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at :31
scala> rdd4.collect
res19: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,2))
scala>
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1),("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at :24
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[35] at cogroup at :28
scala> rdd3.collect
res21: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))))
scala>
scala> val rdd1 = sc.parallelize(List(1,2,3,5,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at :24
scala> val rdd2 = rdd1.reduce(_ + _)
rdd2: Int = 15
scala>
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at :24
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3),("shuke", 2), ("kitty", 5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at :24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[39] at union at :28
scala> rdd3.collect
res23: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5))
scala> val rdd4 = rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[40] at reduceByKey at :30
scala> rdd4.collect
res24: Array[(String, Int)] = Array((shuke,3), (tom,4), (kitty,7), (jerry,5))
scala> val rdd5 = rdd4.map(t => (t._2, t._1))
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[41] at map at :32
scala> rdd5.collect
res25: Array[(Int, String)] = Array((3,shuke), (4,tom), (7,kitty), (5,jerry))
scala> val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false)
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at sortByKey at :32
scala> rdd5.collect
res26: Array[(Int, String)] = Array((7,kitty), (5,jerry), (4,tom), (3,shuke))
scala> rdd5.map(t => (t._2, t._1)).collect
res28: Array[(String, Int)] = Array((kitty,7), (jerry,5), (tom,4), (shuke,3))
scala>