Spark rdd実戦-パーティションの理解と使用

2500 ワード

概要
「spark 2原理分析-RDDのPartitioner原理分析」では、パーティションの基本概念を理解し、実際の例を通じてパーティションの概念をさらに理解し、パーティションの使用方法を学びました.
パーティションの使用シーン
パーティションは、(k,v)タイプのRDDで使用されます.ただし、パーティションを頻繁に変更しないでください.パーティションを頻繁に使用すると、より多くのshuffle操作が発生する可能性があります.
HashPartitionerパーティションの使用
  • 準備タイプ(k,v)のRDD parallelizeにより(k,v)対のRDDデータを生成し、spark-shellでは以下のようにコードされる:
  • scala> import org.apache.spark.HashPartitioner
    scala> val rdd2 = sc.parallelize(1 to 10000).map(x=>(x,x))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at :24
    
    scala> rdd2.partitions.length
    res2: Int = 8
    
    scala> rdd2.partitioner
    res0: Option[org.apache.spark.Partitioner] = None
    

    私はlocal[*]モードで起動したので、私のマシンには8つのコアがあるので、デフォルトのパーティション数は8です.ただし、上記のコードから分かるように、パーティションはNoneです.次に、新しいパーティションを使用します.HashPartitionrを使用して、データを再パーティション化します.
  • HashPartitionerパーティションを使用して
  • を再パーティション化
    scala> val rdd3 = rdd2.partitionBy(new HashPartitioner(10))
    rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at :26
    
    scala> rdd3.partitioner
    res9: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)
    
    scala> rdd3.partitions.length
    res10: Int = 10
    

    以上の例から,独自のHashPartitioner(10)パーティションを用いてデータを再パーティション化し,パーティションを再定義すると,パーティション数が変化していることが分かる.パーティション数はHashPartitionerでパーティション化されたパーティション数になります.
    RangePartitionerパーティションの使用
  • RangePartitionerパーティションを使用して
  • を再パーティション化
    scala> import org.apache.spark.RangePartitioner
    import org.apache.spark.RangePartitioner
    
    scala> val rdd3 = rdd2.partitionBy(new RangePartitioner(20, rdd2))
    rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at :26
    
    scala> rdd3.partitioner
    res3: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@49057ee6)
    
    scala> rdd3.partitions.length
    res4: Int = 20
    

    以上のコード例から分かるように,RangePartitionerパーティションを設定すると,パーティション数は20個となる.
    各パーティションのデータを印刷
    val parts = rdd3.partitions
    for (p  
             if (index == idx) value else Iterator()
        }
        val dataPartitioned = partRdd.collect()
        println("partition id:"+ idx, "elements: " + dataPartitioned.foreach(print))
    }
    

    このコードで各パーティションのデータを印刷できます.RangePartitioner再パーティション化後のデータを印刷することで、RangePartitionerパーティション化後のデータは基本的にデータ範囲によって区切られていることがわかります.
    小結
    この文書では、パーティションの使用について説明します.パーティションの役割は、パーティションを変更することによって検証されます.