第3章キー値対RDDデータパーティション

28718 ワード

前編:第2章ビッグデータSparkCoreのRDDプログラミングケース(下)
一、キー値対RDDデータパーティション器
Sparkは現在HashパーティションとRangeパーティションをサポートしており、ユーザーはパーティションをカスタマイズすることもでき、Hashパーティションは現在のデフォルトパーティションであり、SparkのパーティションはRDDのパーティションの個数、RDDの各データがShuffleプロセスを経てどのパーティションとReduceに属するかを直接決定する.
注意:(1)KeyValueタイプのRDDのみパーティションがあり、非KeyValueタイプのRDDパーティションの値はNone(2)各RDDのパーティションID範囲:0~numPartitions-1であり、この値がそのパーティションに属することを決定する.
1、RDDパーティションの取得
RDDのパーティション方式は、RDDのpartitioner属性を使用して取得することができる.scala.Optionオブジェクトが返され、getメソッドで値が取得されます.関連するソースコードは次のとおりです.
def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

(1)pairRDDを作成する
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:24


(2)RDDのパーティションの表示
scala> pairs.partitioner
res13: Option[org.apache.spark.Partitioner] = None


(3)HashPartitionerクラスのインポート
scala>  import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner


(4)HashPartitionerを用いてRDDを再分割する

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at partitionBy at <console>:27


(5)再パーティション化後のRDDのパーティション化器の表示
scala> partitioned.partitioner
res14: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)


2、Hashパーティション
HashPartitionerパーティションの原理:与えられたkeyに対してhashCodeを計算し、パーティションの個数で除算して余剰をとり、余剰が0未満の場合は余剰+パーティションの個数(そうでなければ0を加算)で、最後に返される値がこのkeyが属するパーティションIDである.
Hashパーティションを使用した操作
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:25

scala> nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect
res16: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))

scala>  val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[18] at partitionBy at <console>:27

scala> hashpar.count
res17: Long = 6                                                                 

scala>  hashpar.partitioner
res18: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)

scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res19: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

scala> nopar.partitioner
res20: Option[org.apache.spark.Partitioner] = None



3、Rangerパーティション
HashPartitionerパーティションの弊害:各パーティションのデータ量が不均一になる可能性があり、極端には一部のパーティションがRDDのすべてのデータを所有することになる.RangePartitionerの役割:ある範囲の数をあるパーティションにマッピングし、できるだけ各パーティションのデータ量の均一性を保証し、パーティションとパーティションの間に秩序があり、1つのパーティションの要素は必ず別のパーティションの要素より小さいか大きいが、パーティション内の要素は順序を保証できない.簡単に言えば、ある範囲の数をあるパーティションにマッピングします.実装手順は次のとおりです.
第一歩:まずRDD全体からサンプルデータを抽出し、サンプルデータを並べ替え、各パーティションの最大key値を計算し、Array[KEY]タイプの配列変数rangeBoundsを形成する.
第2ステップ:rangeBoundsにおけるkeyの範囲を判断し、そのkey値が次のRDDにおけるパーティションidの下付き文字を与える.このパーティションは、RDDのKEYタイプがソート可能であることを要求する
4、カスタムパーティション
カスタムパーティションを実装するには、org.apache.spark.Partitionerクラスを継承し、次の3つの方法を実装する必要があります.
(1)numPartitions:Int:作成したパーティション数を返します.
(2)getPartition(key:Any):Int:所与のキーのパーティション番号(0からnumPartitions-1)を返します.
(3)equals():Javaが等しいと判断する標準的な方法.この方法の実装は非常に重要であり、Sparkはこの方法であなたのパーティションオブジェクトが他のパーティションインスタンスと同じかどうかを確認する必要があります.これにより、Sparkは2つのRDDのパーティション方式が同じかどうかを判断することができます.需要:同じ接尾辞のデータを同じファイルに書き込み、同じ接尾辞のデータを同じパーティションにパーティション化して保存して出力することで実現します.
(1)pairRDDを作成する
scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:25


(2)カスタムパーティションクラスの定義
scala> :paste
// Entering paste mode (ctrl-D to finish)
class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{

  //     
  override def numPartitions: Int = numParts

  //         
  override def getPartition(key: Any): Int = {
    val ckey: String = key.toString
    ckey.substring(ckey.length-1).toInt%numParts
  }
}

// Exiting paste mode, now interpreting.

defined class CustomerPartitioner

(3)RDDをカスタムパーティションクラスで再パーティション化する
scala> val par = data.partitionBy(new CustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27

(4)再分割後のデータ分布の表示
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))

カスタムパーティションを使用するのは簡単です.パーティションBy()メソッドに渡すだけでいいです.Sparkにはjoin()やgroupByKey()のようなデータミキシングに依存する方法が多く、オプションのPartitionerオブジェクトを受信して出力データのパーティション方式を制御することもできる.