Sparkでの変換操作、変換演算子


学習演算子のおすすめサイト:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
文書ディレクトリ
  • 変換操作概要
  • 変換演算子の例
  • ==map、flatMap、distinct==
  • ==coalesceおよびrepartition=:RDDパーティション数、再パーティション
  • を変更します.
  • =randomSplit=:RDDランダム割当て
  • ==glom==各パーティションのデータ項目
  • を返します.
  • ==union=:
  • =subtrat=:差セット
  • =intersection=:交差
  • =mapPartitions==各パーティションについて
  • ==mapPartitionWithIndex==
  • ==zip==
  • ==zipParititions==
  • ==zipWithIndex==
  • ==zipWithUniqueId==
  • ==join==
  • ==rightOuterJoin==
  • ==leftOuterJoin==
  • ==cogroup==


  • キー値ペアに対する変換演算子
  • ==reduceByKey[Pair]==
  • ==groupByKey()[Pair]==
  • ==keyBy==キーとして要素を設定
  • ==keys[Pair]==
  • ==values[Pair]==
  • ==sortByKey[Pair]==
  • ==partitionBy[Pair]==
  • ==mapValues[Pair]==
  • ==flatMapValues[Pair]==
  • ==subtractByKey[Pair]==
  • ==combineByKey[Pair]==
  • ==foldByKey[Pair]==




  • 変換操作の概要
    現在のRDDを新しいRDDデータセットに変換し、不活性評価を特徴とし、行動操作がトリガーされるとRDDは計算を開始する.
    Spark中的 转换操作、转换算子_第1张图片
    変換演算子の例
    map、flatMap、distinct
    map  :   RDD       ,  map              。
                        , :        ,         【       】
            
    flatMap  : Map    ,              ;【       】
             :  Array[String]  , String        
          
    distinct  : RDD          
    
    //map  
    scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> a.partitions.length
    res0: Int = 3
    
    scala> a.glom.collect
    res4: Array[Array[String]] = Array(Array(dog), Array(salmon, salmon), Array(rat, elephant))
    
    scala> val b = a.map(_.length)
    b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:26
    
    scala> b.partitions.length
    res5: Int = 3
    
    scala> b.glom.collect
    res6: Array[Array[Int]] = Array(Array(3), Array(6, 6), Array(3, 8))   
    
    //flatMap  
    scala> rdd1.collect
    res0: Array[Array[String]] = Array(Array(hello, world), Array(how, are, you?), Array(ni, hao), Array(hello, tom))
    
    scala> val rdd2 = rdd1.flatMap(x=>x)
    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28
    
    scala> rdd2.collect
    res1: Array[String] = Array(hello, world, how, are, you?, ni, hao, hello, tom)  
    	
    scala> rdd2.flatMap(x=>x).collect
    res3: Array[Char] = Array(h, e, l, l, o, w, o, r, l, d, h, o, w, a, r, e, y, o, u, ?, n, i, h, a, o, h, e, l, l, o, t, o, m)
             
    
    //distinct   
    scala> a.collect
    res7: Array[String] = Array(dog, salmon, salmon, rat, elephant)
    
    scala> val c = a.distinct
    c: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at distinct at <console>:26
    
    scala> c.collect
    res8: Array[String] = Array(rat, salmon, elephant, dog)
    
    

    coalesceとrepartition:いずれもRDDパーティション数の変更、再パーティション化
    def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
    def repartition ( numPartitions : Int ): RDD [T]
    
     RDD        ,     RDD;     :         ,      shuffle Booleean  ,   false
    	          RDD     ,shuffle false
    	          RDD     ,shuffle   true
        :    filter      ,    RDD        ,      
    
    //  :
    scala> val rdd = sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
    
    scala> rdd.partitions.length
    res10: Int = 1
    
    scala> val rdd1= rdd.coalesce(5,true)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at coalesce at <console>:26
    
    scala> rdd1.partitions.length
    res14: Int = 5
    
    scala> rdd1.glom.collect
    res15: Array[Array[Int]] = Array(Array(5, 10), Array(1, 6), Array(2, 7), Array(3, 8), Array(4, 9))
    

    randomSplit:RDDランダム割当て
    def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
    
               RDD          RDD,             RDD          。
      :      RDD                    。
    
        :Hadoop            
    
    //  :
    scala> rdd.collect
    res19: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val rdd0 =rdd.randomSplit(Array(0.1,0.2,0.8))
    	rdd0: Array[org.apache.spark.rdd.RDD[Int]] = Array(
    	MapPartitionsRDD[17] at randomSplit at <console>:26, 
    	MapPartitionsRDD[18] at randomSplit at <console>:26, 
    	MapPartitionsRDD[19] at randomSplit at <console>:26)
    
    scala> rdd0(0).collect
    res16: Array[Int] = Array(9)
    
    scala> rdd0(1).collect
    res17: Array[Int] = Array(8)
    
    scala> rdd0(2).collect
    res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 10)
    

    glom:各パーティションのデータ・アイテムを返します.
    説明:各パーティションのデータ・アイテムを返します.一般的には、並列度でglomでテストされます.
    scala> val z=sc.parallelize(1 to 15,3)
    z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
    
    scala> z.glom.collect
    res20: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15))
    

    union:パラレルセット
    説明:2つのRDDをマージします.注意しないでください.union後のパーティション数は2つのRDDパーティションの和です.
    scala> val x= sc.parallelize(1 to 6,2)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
    
    scala> val y =sc.parallelize(5 to 13,3)
    y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
    
    scala> val z =x.union(y)
    z: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at <console>:28
    
    scala> z.glom.collect
    res21: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(5, 6, 7), Array(8, 9, 10), Array(11, 12, 13))
    

    subtrat:差分セット
    注意:subtrat操作後のパーティション数は、前のRDDのパーティション数です.
    scala> val z1=x.subtract(y)
    z1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at subtract at <console>:28
    
    scala> z1.glom.collect
    res23: Array[Array[Int]] = Array(Array(2, 4), Array(1, 3))
    

    intersection:交差
    説明:2つのRDDを交差させ、注意を払う:intersection操作後のRDDパーティション数は前のパーティション数の大きい値である
    scala> val z2 = x.intersection(y)
    z2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at intersection at <console>:28
    
    scala> z2.glom.collect
    res24: Array[Array[Int]] = Array(Array(6), Array(), Array(5)) 
    

    mapPartitions:各パーティションを操作する
    def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    
             ,           。
          (Iterarator[T]),                   。
                   [U]。                RDD。
    

    実際の応用:RDDに対してデータベース操作を行う場合、mapPartitionsを用いて各パーティションに対してデータベース接続connオブジェクトをインスタンス化する必要がある.
    //  :
    val a = sc.parallelize(1 to 9, 3)
    
    def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
         
      var res = List[(T, T)]()
      var pre = iter.next
      while (iter.hasNext)
      {
         
        val cur = iter.next;
        res .::= (pre, cur)
        pre = cur;
      }
      res.iterator
    }
    	
    a.mapPartitions(myfunc).collect
    res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
    

    mapPartitionWithIndex
    def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    
       mappartition,       。
               ,                   。
            ,                   
    
    
    val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
    
    def myfunc[T](index: Int, iter: Iterator[T]) : Iterator[String] = {
         
      iter.map(x => index + "," + x)
    }
    
      :iter: Iterator[Int]:Iterator[T]  ,  RDD        
    
    x.mapPartitionsWithIndex(myfunc).collect()
    res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
    

    zip
    説明:任意のパーティションのi番目の要素を組み合わせることで、2つのRDDsを接続します.得られたRDDは、2つのコンポーネント要素グループによって注意される:1.2つのRDD間のデータ型は異なることができる.2.RDD毎に同一のパーティション数を有することを要求する.RDDを必要とする各パーティションは同じデータ個数を有する
    val x1 = sc.parallelize(1 to 15,3)
    val y1 = sc.parallelize(11 to 25,3)
    
    x1.zip(y1).collect
    res27: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20), (11,21), (12,22), (13,23), (14,24), (15,25))
    
    scala>  val z1 = sc.parallelize(21 to 35,3) 
    z1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24
    
    scala> x1.zip(y1).zip(z1).map((x) => (x._1._1, x._1._2, x._2 )).collect
    res28: Array[(Int, Int, Int)] = Array((1,11,21), (2,12,22), (3,13,23), (4,14,24), (5,15,25), (6,16,26), (7,17,27), (8,18,28), (9,19,29), (10,20,30), (11,21,31), (12,22,32), (13,23,33), (14,24,34), (15,25,35))
    

    zipParititions
    zipと同様に、各RDDに同じパーティション数が必要であることが要求される.
    //  :
    val a = sc.parallelize(0 to 9, 3)
    val b = sc.parallelize(10 to 19, 3)
    val c = sc.parallelize(100 to 109, 3)
    
    def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =
    {
         
      var res = List[String]()
      while (aiter.hasNext && biter.hasNext && citer.hasNext)
      {
         
        val x = aiter.next + " " + biter.next + " " + citer.next
        res ::= x
      }
      res.iterator
    }
    
    a.zipPartitions(b, c)(myfunc).collect
    res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)
    

    zipWithIndex def zipWithIndex(): RDD[(T, Long)]既存のRDDの各要素と対応するIndexとを組み合わせて、新しいRDD[(T,Long)]を生成する.
    //  :
    val y1 = sc.parallelize(11 to 25,3)
    
    scala> y1.zipWithIndex.collect
    res29: Array[(Int, Long)] = Array((11,0), (12,1), (13,2), (14,3), (15,4), (16,5), (17,6), (18,7), (19,8), (20,9), (21,10), (22,11), (23,12), (24,13), (25,14))
    
    val z = sc.parallelize(Array("A", "B", "C", "D"))
    val r = z.zipWithIndex
    r.collect
    res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))
    

    zipWithUniqueId
    //  :
    val z = sc.parallelize(100 to 120, 5)
    val r = z.zipWithUniqueId
    r.collect
    
    res12: Array[(Int, Long)] = Array(
    (100,0), (101,5), (102,10), (103,15),
    (104,1),(105,6), (106,11), (107,16), 
    (108,2), (109,7), (110,12), (111,17), 
    (112,3), (113,8), (114,13), (115,18), 
    (116,4), (117,9), (118,14), (119,19), (120,24))
    
    //    :
    step1:            0
    	              1
      	              2
      	              3
      	              4
      	  
    step2:           0+5(   )5+5(   )10+5(   )
    		051015
                     1+5(   )6+5(   )11+5(   )
          	161116
                     2+5(   )7+5(   )12+5(   )
          	271217
                     3+5(   )7+5(   )12+5(   )
          	381318
                     4+5(   )9+5(   )14+5(   )19+5(   )
          	49141924
    

    join
      :   RDD     ,          
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
    
    //  :
    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    
    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val d = c.keyBy(_.length)
    
    b.join(d).collect
    res0: Array[(Int, (String, String))] = Array(
    	(6,(salmon,salmon)), (6,(salmon,rabbit)),(6,(salmon,turkey)), 
    	(6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), 
    	(3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), 
    	(3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
    

    rightOuterJoin
    説明:2つのRDDに対して接続操作を行い、最初のRDDのキーが必ず存在することを確保する(右外部接続)
    
    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val d = c.keyBy(_.length)
    
    b.rightOuterJoin(d).collect
    res2: Array[(Int, (Option[String], String))] = Array(
    	(6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), 
    	(6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), 
    	(3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), 
    	(3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)),
    	(4,(None,wolf)), 
    	(4,(None,bear))
    	)
    

    leftOuterJoin
    説明:2つのRDDに対して接続操作を行い、2番目のRDDのキーが必ず存在することを確保する(左外接続)
    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val d = c.keyBy(_.length)
    
    b.leftOuterJoin(d).collect
    res1: Array[(Int, (String, Option[String]))] = Array(
    	(6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), 
    	(6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), 
    	(3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), 
    	(3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), 
    	(8,(elephant,None))
    	)
    

    cogroup
    説明:2つのRDDの中で同じキーを持つデータを一緒にグループ化し、完全に接続し、キーを使用して最大3つのキー値RDDを組み合わせます.
    val a = sc.parallelize(List(1, 2, 1, 3), 1)
    val b = a.map((_, "b"))
    val c = a.map((_, "c"))
    b.cogroup(c).collect
    res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
    (2,(ArrayBuffer(b),ArrayBuffer(c))),
    (3,(ArrayBuffer(b),ArrayBuffer(c))),
    (1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
    )
    
    val d = a.map((_, "d"))
    b.cogroup(c, d).collect
    res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
    (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
    (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
    (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
    )
    
    val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
    val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
    x.cogroup(y).collect
    res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
    (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), 
    (2,(ArrayBuffer(banana),ArrayBuffer())), 
    (3,(ArrayBuffer(orange),ArrayBuffer())),
    (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
    (5,(ArrayBuffer(),ArrayBuffer(computer))))
    

    キー値ペアに対する変換演算子
    reduceByKey[Pair] def reduceByKey(func: (V, V) => V): RDD[(K, V)]同じキーを持つ値をマージ
    //  :
    scala> val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
    
    scala> val b = a.map(x=>(x.length,x))
    b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[51] at map at <console>:28
    
    scala> b.collect
    res31: Array[(Int, String)] = Array((3,dog), (3,cat), (3,owl), (3,gnu), (3,ant))
    
    scala> b.reduceByKey((x,y)=>x+y).collect
    res32: Array[(Int, String)] = Array((3,dogcatowlgnuant))  
    
    //  :
    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[53] at parallelize at <console>:24
    
    scala> val b = a.map(x=>(x.length,x))
    b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[54] at map at <console>:28
    
    scala> b.collect
    res33: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
    
    scala> b.reduceByKey(_+_).collect
    res34: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
    

    groupByKey()[Pair]
    説明:同じキーキーキーでグループ化し、戻り値はRDD[(K,Iterable[V])]
    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[53] at parallelize at <console>:24
    
    scala> val b = a.map(x=>(x.length,x))
    b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[54] at map at <console>:28
    
    scala> b.groupByKey
    res35: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[56] at groupByKey at <console>:31
    
    scala> b.groupByKey.collect
    res37: Array[(Int, Iterable[String])] = Array(
    		(4,CompactBuffer(lion)), (3,CompactBuffer(dog, cat)), 
    		(7,CompactBuffer(panther)), (5,CompactBuffer(tiger, eagle)))
    

    keyByキーとしてエレメントを設定def keyBy[K](f: T => K): RDD[(K, T)]
    説明:f関数の戻り値をKeyとし、RDDの各要素とpiarRDD{RDD[(K,T)]}を構成する
    scala> a.collect
    res39: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
    
    scala> a.keyBy(x=>x.head).collect
    scala> a.keyBy(_.head).collect
    //    
    res38: Array[(Char, String)] = Array((d,dog), (t,tiger), (l,lion), (c,cat), (p,panther), (e,eagle))
    

    keys[Pair] def keys: RDD[K]説明:keyを有するRDDを返す
    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    scala> val b = a.map(x => (x.length, x))
    
    scala> b.keys.collect
    res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)
    
    scala> val b = a.keyBy(_.head)
    b: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[63] at keyBy at <console>:26
    
    scala> b.keys.collect
    res46: Array[Char] = Array(d, s, s, r, e)
    

    values[Pair] def values: RDD[V]説明:valueを有するRDDを返す
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    
    b.values.collect
    res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
    

    sortByKey[Pair]
    def sortByKey(ascending:Boolean=true,numPartitions:Int=self.partitions.size):RDD[P]説明:keyによるソート、デフォルトascending:Boolean=true("昇順")
    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    val b = sc.parallelize(1 to a.count.toInt, 2)
    val c = a.zip(b)
    
    c.sortByKey(true).collect
    res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
    
    c.sortByKey(false).collect
    res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
    

    partitionBy[Pair]
    def partitionBy(partitioner:Partitioner):RDD[(K,V)]説明:Partitionerを設定してRDDを再分割する
    	scala> val rdd = sc.parallelize(List((1,"a"),(2,"b"),(3,"c"),(4,"d")),2)
    	rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24
    
    	scala> rdd.glom.collect
    	res28: Array[Array[(Int, String)]] = Array(Array((1,a), (2,b)), Array((3,c), (4,d)))
    	
    	scala> val rdd1=rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    	rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[28] at partitionBy at <console>:26
    
    	scala> rdd1.glom.collect
    	res29: Array[Array[(Int, String)]] = Array(Array((4,d), (2,b)), Array((1,a), (3,c)))
    

    mapValues[Pair]
    2つのコンポーネント要素グループからなるRDDの値を取得し、提供された関数を適用して各値を変換する.その後、キーと変換された値を使用して新しいデュアルコンポーネント要素グループを形成し、それらを新しいRDDに格納するdef mapValues[U](f:V=>U):RDD[(K,U)]説明:RDD[(K,V)]-->RDD[(K,U)]をValueに対して操作する(f:V=>U)
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    
    b.mapValues("x" + _ + "x").collect
    res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
    

    flatMapValues[Pair]
    def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    
    b.flatMapValues("x" + _ + "x").collect
    res6: Array[(Int, Char)] = Array(
    	(3,x), (3,d), (3,o), (3,g), (3,x),
     	(5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), 
     	(4,x), (4,l), (4,i), (4,o), (4,n), (4,x), 
     	(3,x), (3,c), (3,a), (3,t), (3,x),
      	(7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x),
       	(5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x)) 
    

    subtractByKey[Pair]
    def subtractByKey[W:ClassTag](other:RDD[(K,W)]):RDD[(K,V)]説明:RDDのキーとother RDDのキーが同じ要素を削除する
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
    val d = c.keyBy(_.length)
    
    b.subtractByKey(d).collect
    res15: Array[(Int, String)] = Array((4,lion))
    

    combineByKey[Pair] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]説明:createCombiner:パーティション内で最初に発生したキーに遭遇した場合、この関数mergeValueがトリガーされます.パーティション内で再び発生したキーの場合、この関数mergeCombinersがトリガーされます.mergeCombiners:異なる領域で同じKeyのValueを処理し、この関数を実行します.
    RDD      :
    	scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))  
    	rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    	scala> rdd1.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
    	res0: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[1] at combineByKey at <console>:27
    
    	scala> res0.collect
    	res1: Array[(String, String)] = Array((B,1_@2), (A,1_@2), (C,1_))
    	
    RDD      :
        scala> val rdd2 = rdd1.repartition(2)
    	rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at repartition at <console>:26
    
    	scala> rdd2.partitions.size
    	res2: Int = 2
    
    	scala> rdd2.glom.collect
    	res3: Array[Array[(String, Int)]] = Array(Array((A,1), (B,1), (C,1)), Array((A,2), (B,2)))
    
    	scala> rdd2.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
    	res4: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[7] at combineByKey at <console>:29
    
        scala> res4.collect
    	res6: Array[(String, String)] = Array((B,1_$2_), (A,1_$2_), (C,1_))
           
    RDD      :
    	scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(3))
    	rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at partitionBy at <console>:26
    
    	scala> rdd3.partitions.size
    	res7: Int = 3
    
    	scala> rdd3.glom.collect
    	res8: Array[Array[(String, Int)]] = Array(Array((B,1), (B,2)), Array((C,1)), Array((A,1), (A,2)))
    
    	scala> rdd3.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
    	res9: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at combineByKey at <console>:29
    
    	scala> res9.collect
    	res10: Array[(String, String)] = Array((B,1_@2), (C,1_), (A,1_@2))
    
    //  :               	
    val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    val c = b.zip(a)
    val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
    d.collect
    res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
    

    foldByKey[Pair]
    def foldByKey(zeroValue:V)(func:(V,V)=>V):RDD[(K,V)]説明:reduceByKeyと同様に機能する(同じキーの値をマージする)が、コリー化関数により、まずzeroValueを初期化する
    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    val b = a.map(x => (x.length, x))
    
    b.foldByKey("")(_ + _).collect
    res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)
    
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    
    b.foldByKey("")(_ + _).collect
    res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))