Sparkでの変換操作、変換演算子
学習演算子のおすすめサイト:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
文書ディレクトリ 変換操作概要 変換演算子の例 ==map、flatMap、distinct== ==coalesceおよびrepartition=:RDDパーティション数、再パーティション を変更します.=randomSplit=:RDDランダム割当て ==glom==各パーティションのデータ項目 を返します.==union=: =subtrat=:差セット =intersection=:交差 =mapPartitions==各パーティションについて ==mapPartitionWithIndex== ==zip== ==zipParititions== ==zipWithIndex== ==zipWithUniqueId== ==join== ==rightOuterJoin== ==leftOuterJoin== ==cogroup==
キー値ペアに対する変換演算子 ==reduceByKey[Pair]== ==groupByKey()[Pair]== ==keyBy==キーとして要素を設定 ==keys[Pair]== ==values[Pair]== ==sortByKey[Pair]== ==partitionBy[Pair]== ==mapValues[Pair]== ==flatMapValues[Pair]== ==subtractByKey[Pair]== ==combineByKey[Pair]== ==foldByKey[Pair]==
変換操作の概要
現在のRDDを新しいRDDデータセットに変換し、不活性評価を特徴とし、行動操作がトリガーされるとRDDは計算を開始する.
変換演算子の例
map、flatMap、distinct
coalesceとrepartition:いずれもRDDパーティション数の変更、再パーティション化
randomSplit:RDDランダム割当て
glom:各パーティションのデータ・アイテムを返します.
説明:各パーティションのデータ・アイテムを返します.一般的には、並列度でglomでテストされます.
union:パラレルセット
説明:2つのRDDをマージします.注意しないでください.union後のパーティション数は2つのRDDパーティションの和です.
subtrat:差分セット
注意:subtrat操作後のパーティション数は、前のRDDのパーティション数です.
intersection:交差
説明:2つのRDDを交差させ、注意を払う:intersection操作後のRDDパーティション数は前のパーティション数の大きい値である
mapPartitions:各パーティションを操作する
実際の応用:RDDに対してデータベース操作を行う場合、mapPartitionsを用いて各パーティションに対してデータベース接続connオブジェクトをインスタンス化する必要がある.
mapPartitionWithIndex
zip
説明:任意のパーティションのi番目の要素を組み合わせることで、2つのRDDsを接続します.得られたRDDは、2つのコンポーネント要素グループによって注意される:1.2つのRDD間のデータ型は異なることができる.2.RDD毎に同一のパーティション数を有することを要求する.RDDを必要とする各パーティションは同じデータ個数を有する
zipParititions
zipと同様に、各RDDに同じパーティション数が必要であることが要求される.
zipWithIndex
zipWithUniqueId
join
rightOuterJoin
説明:2つのRDDに対して接続操作を行い、最初のRDDのキーが必ず存在することを確保する(右外部接続)
leftOuterJoin
説明:2つのRDDに対して接続操作を行い、2番目のRDDのキーが必ず存在することを確保する(左外接続)
cogroup
説明:2つのRDDの中で同じキーを持つデータを一緒にグループ化し、完全に接続し、キーを使用して最大3つのキー値RDDを組み合わせます.
キー値ペアに対する変換演算子
reduceByKey[Pair]
groupByKey()[Pair]
説明:同じキーキーキーでグループ化し、戻り値はRDD[(K,Iterable[V])]
keyByキーとしてエレメントを設定
説明:f関数の戻り値をKeyとし、RDDの各要素とpiarRDD{RDD[(K,T)]}を構成する
keys[Pair]
values[Pair]
sortByKey[Pair]
def sortByKey(ascending:Boolean=true,numPartitions:Int=self.partitions.size):RDD[P]説明:keyによるソート、デフォルトascending:Boolean=true("昇順")
partitionBy[Pair]
def partitionBy(partitioner:Partitioner):RDD[(K,V)]説明:Partitionerを設定してRDDを再分割する
mapValues[Pair]
2つのコンポーネント要素グループからなるRDDの値を取得し、提供された関数を適用して各値を変換する.その後、キーと変換された値を使用して新しいデュアルコンポーネント要素グループを形成し、それらを新しいRDDに格納するdef mapValues[U](f:V=>U):RDD[(K,U)]説明:RDD[(K,V)]-->RDD[(K,U)]をValueに対して操作する(f:V=>U)
flatMapValues[Pair]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
subtractByKey[Pair]
def subtractByKey[W:ClassTag](other:RDD[(K,W)]):RDD[(K,V)]説明:RDDのキーとother RDDのキーが同じ要素を削除する
combineByKey[Pair]
foldByKey[Pair]
def foldByKey(zeroValue:V)(func:(V,V)=>V):RDD[(K,V)]説明:reduceByKeyと同様に機能する(同じキーの値をマージする)が、コリー化関数により、まずzeroValueを初期化する
文書ディレクトリ
変換操作の概要
現在のRDDを新しいRDDデータセットに変換し、不活性評価を特徴とし、行動操作がトリガーされるとRDDは計算を開始する.
変換演算子の例
map、flatMap、distinct
map : RDD , map 。
, : , 【 】
flatMap : Map , ;【 】
: Array[String] , String
distinct : RDD
//map
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> a.partitions.length
res0: Int = 3
scala> a.glom.collect
res4: Array[Array[String]] = Array(Array(dog), Array(salmon, salmon), Array(rat, elephant))
scala> val b = a.map(_.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:26
scala> b.partitions.length
res5: Int = 3
scala> b.glom.collect
res6: Array[Array[Int]] = Array(Array(3), Array(6, 6), Array(3, 8))
//flatMap
scala> rdd1.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(how, are, you?), Array(ni, hao), Array(hello, tom))
scala> val rdd2 = rdd1.flatMap(x=>x)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28
scala> rdd2.collect
res1: Array[String] = Array(hello, world, how, are, you?, ni, hao, hello, tom)
scala> rdd2.flatMap(x=>x).collect
res3: Array[Char] = Array(h, e, l, l, o, w, o, r, l, d, h, o, w, a, r, e, y, o, u, ?, n, i, h, a, o, h, e, l, l, o, t, o, m)
//distinct
scala> a.collect
res7: Array[String] = Array(dog, salmon, salmon, rat, elephant)
scala> val c = a.distinct
c: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at distinct at <console>:26
scala> c.collect
res8: Array[String] = Array(rat, salmon, elephant, dog)
coalesceとrepartition:いずれもRDDパーティション数の変更、再パーティション化
def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
def repartition ( numPartitions : Int ): RDD [T]
RDD , RDD; : , shuffle Booleean , false
RDD ,shuffle false
RDD ,shuffle true
: filter , RDD ,
// :
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd.partitions.length
res10: Int = 1
scala> val rdd1= rdd.coalesce(5,true)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at coalesce at <console>:26
scala> rdd1.partitions.length
res14: Int = 5
scala> rdd1.glom.collect
res15: Array[Array[Int]] = Array(Array(5, 10), Array(1, 6), Array(2, 7), Array(3, 8), Array(4, 9))
randomSplit:RDDランダム割当て
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
RDD RDD, RDD 。
: RDD 。
:Hadoop
// :
scala> rdd.collect
res19: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val rdd0 =rdd.randomSplit(Array(0.1,0.2,0.8))
rdd0: Array[org.apache.spark.rdd.RDD[Int]] = Array(
MapPartitionsRDD[17] at randomSplit at <console>:26,
MapPartitionsRDD[18] at randomSplit at <console>:26,
MapPartitionsRDD[19] at randomSplit at <console>:26)
scala> rdd0(0).collect
res16: Array[Int] = Array(9)
scala> rdd0(1).collect
res17: Array[Int] = Array(8)
scala> rdd0(2).collect
res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 10)
glom:各パーティションのデータ・アイテムを返します.
説明:各パーティションのデータ・アイテムを返します.一般的には、並列度でglomでテストされます.
scala> val z=sc.parallelize(1 to 15,3)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> z.glom.collect
res20: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15))
union:パラレルセット
説明:2つのRDDをマージします.注意しないでください.union後のパーティション数は2つのRDDパーティションの和です.
scala> val x= sc.parallelize(1 to 6,2)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> val y =sc.parallelize(5 to 13,3)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val z =x.union(y)
z: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at <console>:28
scala> z.glom.collect
res21: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(5, 6, 7), Array(8, 9, 10), Array(11, 12, 13))
subtrat:差分セット
注意:subtrat操作後のパーティション数は、前のRDDのパーティション数です.
scala> val z1=x.subtract(y)
z1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at subtract at <console>:28
scala> z1.glom.collect
res23: Array[Array[Int]] = Array(Array(2, 4), Array(1, 3))
intersection:交差
説明:2つのRDDを交差させ、注意を払う:intersection操作後のRDDパーティション数は前のパーティション数の大きい値である
scala> val z2 = x.intersection(y)
z2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at intersection at <console>:28
scala> z2.glom.collect
res24: Array[Array[Int]] = Array(Array(6), Array(), Array(5))
mapPartitions:各パーティションを操作する
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
, 。
(Iterarator[T]), 。
[U]。 RDD。
実際の応用:RDDに対してデータベース操作を行う場合、mapPartitionsを用いて各パーティションに対してデータベース接続connオブジェクトをインスタンス化する必要がある.
// :
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
mapPartitionWithIndex
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mappartition, 。
, 。
,
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc[T](index: Int, iter: Iterator[T]) : Iterator[String] = {
iter.map(x => index + "," + x)
}
:iter: Iterator[Int]:Iterator[T] , RDD
x.mapPartitionsWithIndex(myfunc).collect()
res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
zip
説明:任意のパーティションのi番目の要素を組み合わせることで、2つのRDDsを接続します.得られたRDDは、2つのコンポーネント要素グループによって注意される:1.2つのRDD間のデータ型は異なることができる.2.RDD毎に同一のパーティション数を有することを要求する.RDDを必要とする各パーティションは同じデータ個数を有する
val x1 = sc.parallelize(1 to 15,3)
val y1 = sc.parallelize(11 to 25,3)
x1.zip(y1).collect
res27: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20), (11,21), (12,22), (13,23), (14,24), (15,25))
scala> val z1 = sc.parallelize(21 to 35,3)
z1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24
scala> x1.zip(y1).zip(z1).map((x) => (x._1._1, x._1._2, x._2 )).collect
res28: Array[(Int, Int, Int)] = Array((1,11,21), (2,12,22), (3,13,23), (4,14,24), (5,15,25), (6,16,26), (7,17,27), (8,18,28), (9,19,29), (10,20,30), (11,21,31), (12,22,32), (13,23,33), (14,24,34), (15,25,35))
zipParititions
zipと同様に、各RDDに同じパーティション数が必要であることが要求される.
// :
val a = sc.parallelize(0 to 9, 3)
val b = sc.parallelize(10 to 19, 3)
val c = sc.parallelize(100 to 109, 3)
def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =
{
var res = List[String]()
while (aiter.hasNext && biter.hasNext && citer.hasNext)
{
val x = aiter.next + " " + biter.next + " " + citer.next
res ::= x
}
res.iterator
}
a.zipPartitions(b, c)(myfunc).collect
res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)
zipWithIndex
def zipWithIndex(): RDD[(T, Long)]
既存のRDDの各要素と対応するIndexとを組み合わせて、新しいRDD[(T,Long)]を生成する.// :
val y1 = sc.parallelize(11 to 25,3)
scala> y1.zipWithIndex.collect
res29: Array[(Int, Long)] = Array((11,0), (12,1), (13,2), (14,3), (15,4), (16,5), (17,6), (18,7), (19,8), (20,9), (21,10), (22,11), (23,12), (24,13), (25,14))
val z = sc.parallelize(Array("A", "B", "C", "D"))
val r = z.zipWithIndex
r.collect
res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))
zipWithUniqueId
// :
val z = sc.parallelize(100 to 120, 5)
val r = z.zipWithUniqueId
r.collect
res12: Array[(Int, Long)] = Array(
(100,0), (101,5), (102,10), (103,15),
(104,1),(105,6), (106,11), (107,16),
(108,2), (109,7), (110,12), (111,17),
(112,3), (113,8), (114,13), (115,18),
(116,4), (117,9), (118,14), (119,19), (120,24))
// :
step1: 0
1
2
3
4
step2: 0+5( ), 5+5( ), 10+5( )
0,5,10,15
1+5( ), 6+5( ), 11+5( )
1,6,11,16
2+5( ), 7+5( ), 12+5( )
2,7,12,17
3+5( ), 7+5( ), 12+5( )
3,8,13,18
4+5( ), 9+5( ), 14+5( ), 19+5( )
4,9,14,19,24
join
: RDD ,
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
// :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array(
(6,(salmon,salmon)), (6,(salmon,rabbit)),(6,(salmon,turkey)),
(6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)),
(3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)),
(3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
rightOuterJoin
説明:2つのRDDに対して接続操作を行い、最初のRDDのキーが必ず存在することを確保する(右外部接続)
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
res2: Array[(Int, (Option[String], String))] = Array(
(6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)),
(6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)),
(3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)),
(3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)),
(4,(None,wolf)),
(4,(None,bear))
)
leftOuterJoin
説明:2つのRDDに対して接続操作を行い、2番目のRDDのキーが必ず存在することを確保する(左外接続)
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect
res1: Array[(Int, (String, Option[String]))] = Array(
(6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))),
(6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))),
(3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))),
(3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))),
(8,(elephant,None))
)
cogroup
説明:2つのRDDの中で同じキーを持つデータを一緒にグループ化し、完全に接続し、キーを使用して最大3つのキー値RDDを組み合わせます.
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)
val d = a.map((_, "d"))
b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)
val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
x.cogroup(y).collect
res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))),
(2,(ArrayBuffer(banana),ArrayBuffer())),
(3,(ArrayBuffer(orange),ArrayBuffer())),
(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
(5,(ArrayBuffer(),ArrayBuffer(computer))))
キー値ペアに対する変換演算子
reduceByKey[Pair]
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
同じキーを持つ値をマージ// :
scala> val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
scala> val b = a.map(x=>(x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[51] at map at <console>:28
scala> b.collect
res31: Array[(Int, String)] = Array((3,dog), (3,cat), (3,owl), (3,gnu), (3,ant))
scala> b.reduceByKey((x,y)=>x+y).collect
res32: Array[(Int, String)] = Array((3,dogcatowlgnuant))
// :
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[53] at parallelize at <console>:24
scala> val b = a.map(x=>(x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[54] at map at <console>:28
scala> b.collect
res33: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
scala> b.reduceByKey(_+_).collect
res34: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
groupByKey()[Pair]
説明:同じキーキーキーでグループ化し、戻り値はRDD[(K,Iterable[V])]
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[53] at parallelize at <console>:24
scala> val b = a.map(x=>(x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[54] at map at <console>:28
scala> b.groupByKey
res35: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[56] at groupByKey at <console>:31
scala> b.groupByKey.collect
res37: Array[(Int, Iterable[String])] = Array(
(4,CompactBuffer(lion)), (3,CompactBuffer(dog, cat)),
(7,CompactBuffer(panther)), (5,CompactBuffer(tiger, eagle)))
keyByキーとしてエレメントを設定
def keyBy[K](f: T => K): RDD[(K, T)]
説明:f関数の戻り値をKeyとし、RDDの各要素とpiarRDD{RDD[(K,T)]}を構成する
scala> a.collect
res39: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
scala> a.keyBy(x=>x.head).collect
scala> a.keyBy(_.head).collect
//
res38: Array[(Char, String)] = Array((d,dog), (t,tiger), (l,lion), (c,cat), (p,panther), (e,eagle))
keys[Pair]
def keys: RDD[K]
説明:keyを有するRDDを返すscala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.keys.collect
res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)
scala> val b = a.keyBy(_.head)
b: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[63] at keyBy at <console>:26
scala> b.keys.collect
res46: Array[Char] = Array(d, s, s, r, e)
values[Pair]
def values: RDD[V]
説明:valueを有するRDDを返すval a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.values.collect
res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
sortByKey[Pair]
def sortByKey(ascending:Boolean=true,numPartitions:Int=self.partitions.size):RDD[P]説明:keyによるソート、デフォルトascending:Boolean=true("昇順")
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
partitionBy[Pair]
def partitionBy(partitioner:Partitioner):RDD[(K,V)]説明:Partitionerを設定してRDDを再分割する
scala> val rdd = sc.parallelize(List((1,"a"),(2,"b"),(3,"c"),(4,"d")),2)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24
scala> rdd.glom.collect
res28: Array[Array[(Int, String)]] = Array(Array((1,a), (2,b)), Array((3,c), (4,d)))
scala> val rdd1=rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[28] at partitionBy at <console>:26
scala> rdd1.glom.collect
res29: Array[Array[(Int, String)]] = Array(Array((4,d), (2,b)), Array((1,a), (3,c)))
mapValues[Pair]
2つのコンポーネント要素グループからなるRDDの値を取得し、提供された関数を適用して各値を変換する.その後、キーと変換された値を使用して新しいデュアルコンポーネント要素グループを形成し、それらを新しいRDDに格納するdef mapValues[U](f:V=>U):RDD[(K,U)]説明:RDD[(K,V)]-->RDD[(K,U)]をValueに対して操作する(f:V=>U)
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
flatMapValues[Pair]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.flatMapValues("x" + _ + "x").collect
res6: Array[(Int, Char)] = Array(
(3,x), (3,d), (3,o), (3,g), (3,x),
(5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x),
(4,x), (4,l), (4,i), (4,o), (4,n), (4,x),
(3,x), (3,c), (3,a), (3,t), (3,x),
(7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x),
(5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x))
subtractByKey[Pair]
def subtractByKey[W:ClassTag](other:RDD[(K,W)]):RDD[(K,V)]説明:RDDのキーとother RDDのキーが同じ要素を削除する
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
val d = c.keyBy(_.length)
b.subtractByKey(d).collect
res15: Array[(Int, String)] = Array((4,lion))
combineByKey[Pair]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
説明:createCombiner:パーティション内で最初に発生したキーに遭遇した場合、この関数mergeValueがトリガーされます.パーティション内で再び発生したキーの場合、この関数mergeCombinersがトリガーされます.mergeCombiners:異なる領域で同じKeyのValueを処理し、この関数を実行します.RDD :
scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd1.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
res0: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[1] at combineByKey at <console>:27
scala> res0.collect
res1: Array[(String, String)] = Array((B,1_@2), (A,1_@2), (C,1_))
RDD :
scala> val rdd2 = rdd1.repartition(2)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at repartition at <console>:26
scala> rdd2.partitions.size
res2: Int = 2
scala> rdd2.glom.collect
res3: Array[Array[(String, Int)]] = Array(Array((A,1), (B,1), (C,1)), Array((A,2), (B,2)))
scala> rdd2.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
res4: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[7] at combineByKey at <console>:29
scala> res4.collect
res6: Array[(String, String)] = Array((B,1_$2_), (A,1_$2_), (C,1_))
RDD :
scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(3))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at partitionBy at <console>:26
scala> rdd3.partitions.size
res7: Int = 3
scala> rdd3.glom.collect
res8: Array[Array[(String, Int)]] = Array(Array((B,1), (B,2)), Array((C,1)), Array((A,1), (A,2)))
scala> rdd3.combineByKey(x=>x+"_",(x:String,y:Int)=>x+"@"+y,(x:String,y:String)=>x+"$"+y)
res9: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at combineByKey at <console>:29
scala> res9.collect
res10: Array[(String, String)] = Array((B,1_@2), (C,1_), (A,1_@2))
// :
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
foldByKey[Pair]
def foldByKey(zeroValue:V)(func:(V,V)=>V):RDD[(K,V)]説明:reduceByKeyと同様に機能する(同じキーの値をマージする)が、コリー化関数により、まずzeroValueを初期化する
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.foldByKey("")(_ + _).collect
res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.foldByKey("")(_ + _).collect
res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))