Spark RDD操作練習1

6878 ワード

  • RDD基礎練習
  • scala> sc
    res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40283584
    
    scala> val rdd1 = sc.parallelize
    
    def parallelize[T](seq: Seq[T],numSlices: Int)(implicit evidence$1: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]
    
    scala> val rdd1 = sc.parallelize(List(5,4,6,6,7,3,1,9))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
    
    scala> val rdd2 = rdd1.map(_ *2).sortBy(x =>x, true)
    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at :26
    
    scala> val rdd3 = rdd2.filter(_ >= 10)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at :28
    
    scala> rdd3.collect
    res2: Array[Int] = Array(10, 12, 12, 14, 18)         
    
    scala> rdd1.collect
    res4: Array[Int] = Array(5, 4, 6, 6, 7, 3, 1, 9)
    
    scala> rdd2.collect
    res5: Array[Int] = Array(2, 6, 8, 10, 12, 12, 14, 18)
    
    scala> 
    
    
    
    scala> val rdd1= sc.parallelize(Array("a b c", "d f e", "h i"))
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at :24
    
    scala> rdd1.collect
    res6: Array[String] = Array(a b c, d f e, h i)
    
    scala> val rdd2 = rdd1.flatMap(_.split(' '))
    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at :26
    
    scala> rdd2.collect
    res7: Array[String] = Array(a, b, c, d, f, e, h, i)
    
    scala> val rdd3 = rdd1.flatMap(_.split(" "))
    rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at :26
    
    scala> rdd3.collect
    res9: Array[String] = Array(a, b, c, d, f, e, h, i)
    
    scala> 
    
    
    scala> val rdd1 = sc.parallelize(List(5, 6, 4, 3))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :24
    
    scala> val rdd2 = sc.parallelize(List(1, 2, 4, 5))
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at :24
    
    scala> val rdd3 = rdd1.union(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[14] at union at :28
    
    scala> rdd3.collect
    res12: Array[Int] = Array(5, 6, 4, 3, 1, 2, 4, 5)
    
    scala> val rdd4 = rdd1.intersection(rdd2)
    rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at :28
    
    scala> rdd4.collect
    res13: Array[Int] = Array(4, 5)                                                 
    
    scala> rdd3.distinct
    res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at distinct at :31
    
    scala> res14.collect
    res15: Array[Int] = Array(1, 2, 3, 4, 5, 6)
    
    scala> 
    
    
    scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at :24
    
    scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 2)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at :24
    
    scala> val rdd3 = rdd1.join(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[28] at join at :28
    
    scala> rdd3.collect
    res16: Array[(String, (Int, Int))] = Array((tom,(1,2)), (jerry,(3,2)))
    
    scala> val rdd4 = rdd1 union rdd2
    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[29] at union at :28
    
    scala> rdd4.collect
    res17: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,2))
    
    scala> rdd4.groupByKey
    res18: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at :31
    
    scala> rdd4.collect
    res19: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,2))
    
    scala> 
    
    
    scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24
    
    scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1),("shuke", 2)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at :24
    
    scala> val rdd3 = rdd1.cogroup(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[35] at cogroup at :28
    
    scala> rdd3.collect
    res21: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))))
    
    scala> 
    
    scala> val rdd1 = sc.parallelize(List(1,2,3,5,4))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at :24
    
    scala> val rdd2 = rdd1.reduce(_ + _)
    rdd2: Int = 15
    
    scala> 
    
    
    scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at :24
    
    scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3),("shuke", 2), ("kitty", 5)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at :24
    
    scala> val rdd3 = rdd1.union(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[39] at union at :28
    
    scala> rdd3.collect
    res23: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5))
    
    scala> val rdd4 = rdd3.reduceByKey(_ + _)
    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[40] at reduceByKey at :30
    
    scala> rdd4.collect
    res24: Array[(String, Int)] = Array((shuke,3), (tom,4), (kitty,7), (jerry,5))
    
    scala> val rdd5 = rdd4.map(t => (t._2, t._1))
    rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[41] at map at :32
    
    scala> rdd5.collect
    res25: Array[(Int, String)] = Array((3,shuke), (4,tom), (7,kitty), (5,jerry))
    
    scala> val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false)
    rdd5: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at sortByKey at :32
    
    scala> rdd5.collect
    res26: Array[(Int, String)] = Array((7,kitty), (5,jerry), (4,tom), (3,shuke))
    
    scala> rdd5.map(t => (t._2, t._1)).collect
    res28: Array[(String, Int)] = Array((kitty,7), (jerry,5), (tom,4), (shuke,3))   
    
    scala>