Spark rdd実戦-パーティションの理解と使用
2500 ワード
概要
「spark 2原理分析-RDDのPartitioner原理分析」では、パーティションの基本概念を理解し、実際の例を通じてパーティションの概念をさらに理解し、パーティションの使用方法を学びました.
パーティションの使用シーン
パーティションは、(k,v)タイプのRDDで使用されます.ただし、パーティションを頻繁に変更しないでください.パーティションを頻繁に使用すると、より多くのshuffle操作が発生する可能性があります.
HashPartitionerパーティションの使用準備タイプ(k,v)のRDD parallelizeにより(k,v)対のRDDデータを生成し、spark-shellでは以下のようにコードされる:
私はlocal[*]モードで起動したので、私のマシンには8つのコアがあるので、デフォルトのパーティション数は8です.ただし、上記のコードから分かるように、パーティションはNoneです.次に、新しいパーティションを使用します.HashPartitionrを使用して、データを再パーティション化します. HashPartitionerパーティションを使用して を再パーティション化
以上の例から,独自のHashPartitioner(10)パーティションを用いてデータを再パーティション化し,パーティションを再定義すると,パーティション数が変化していることが分かる.パーティション数はHashPartitionerでパーティション化されたパーティション数になります.
RangePartitionerパーティションの使用 RangePartitionerパーティションを使用して を再パーティション化
以上のコード例から分かるように,RangePartitionerパーティションを設定すると,パーティション数は20個となる.
各パーティションのデータを印刷
このコードで各パーティションのデータを印刷できます.RangePartitioner再パーティション化後のデータを印刷することで、RangePartitionerパーティション化後のデータは基本的にデータ範囲によって区切られていることがわかります.
小結
この文書では、パーティションの使用について説明します.パーティションの役割は、パーティションを変更することによって検証されます.
「spark 2原理分析-RDDのPartitioner原理分析」では、パーティションの基本概念を理解し、実際の例を通じてパーティションの概念をさらに理解し、パーティションの使用方法を学びました.
パーティションの使用シーン
パーティションは、(k,v)タイプのRDDで使用されます.ただし、パーティションを頻繁に変更しないでください.パーティションを頻繁に使用すると、より多くのshuffle操作が発生する可能性があります.
HashPartitionerパーティションの使用
scala> import org.apache.spark.HashPartitioner
scala> val rdd2 = sc.parallelize(1 to 10000).map(x=>(x,x))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at :24
scala> rdd2.partitions.length
res2: Int = 8
scala> rdd2.partitioner
res0: Option[org.apache.spark.Partitioner] = None
私はlocal[*]モードで起動したので、私のマシンには8つのコアがあるので、デフォルトのパーティション数は8です.ただし、上記のコードから分かるように、パーティションはNoneです.次に、新しいパーティションを使用します.HashPartitionrを使用して、データを再パーティション化します.
scala> val rdd3 = rdd2.partitionBy(new HashPartitioner(10))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at :26
scala> rdd3.partitioner
res9: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)
scala> rdd3.partitions.length
res10: Int = 10
以上の例から,独自のHashPartitioner(10)パーティションを用いてデータを再パーティション化し,パーティションを再定義すると,パーティション数が変化していることが分かる.パーティション数はHashPartitionerでパーティション化されたパーティション数になります.
RangePartitionerパーティションの使用
scala> import org.apache.spark.RangePartitioner
import org.apache.spark.RangePartitioner
scala> val rdd3 = rdd2.partitionBy(new RangePartitioner(20, rdd2))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at :26
scala> rdd3.partitioner
res3: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@49057ee6)
scala> rdd3.partitions.length
res4: Int = 20
以上のコード例から分かるように,RangePartitionerパーティションを設定すると,パーティション数は20個となる.
各パーティションのデータを印刷
val parts = rdd3.partitions
for (p
if (index == idx) value else Iterator()
}
val dataPartitioned = partRdd.collect()
println("partition id:"+ idx, "elements: " + dataPartitioned.foreach(print))
}
このコードで各パーティションのデータを印刷できます.RangePartitioner再パーティション化後のデータを印刷することで、RangePartitionerパーティション化後のデータは基本的にデータ範囲によって区切られていることがわかります.
小結
この文書では、パーティションの使用について説明します.パーティションの役割は、パーティションを変更することによって検証されます.