Spark RDDパーティション数の詳細

3072 ワード

基礎知識


spark.default.parallelism:(デフォルトのコンカレント数)=2
プロファイルspark-default.confに表示されない構成は、次のルールに従って値を取ります.
1.ローカルモード(executorは起動せず、SparkSubmitプロセスによって指定された数のスレッド数を生成して同時実行する):
    spark-shell       spark.default.parallelism = 1
    spark-shell --master local[N] spark.default.parallelism=N(N個のコアを使用)
    spark-shell --master local       spark.default.parallelism = 1
2、擬似クラスタモード(xは自機で起動したexecutor数、yは各executorで使用したcore数、zは各executorで使用したメモリ)
    spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y
3、その他のモード(ここでは主にyarnモードを指すが、もちろんstandaloneもそうである)
    Others: total number of cores on all executor nodes or 2, whichever is larger
    spark.default.parallelism=max(すべてのexecutorで使用されるcoreの総数、2)
上のルールでsparkを特定できます.default.parallelismのデフォルト値(プロファイルspark-default.confに表示されない構成が前提で、構成されている場合spark.default.parallelism=構成の値)
もう一つの構成が重要ですfiles.maxPartitionBytes=128 M(デフォルト)The maximum number of bytes to pack into a single partition when reading files.rddの1つのパーティションがデータを格納できる最大バイト数を表し、400 mのファイルが2つの領域に分かれている場合、actionでエラーが発生します.
1つのsparkアプリケーションが実行するとsparkが生成される.contextは、上記のsparkから得る2つのパラメータを同時に生成する.default.parallelismはこの2つのパラメータの値を導出した.
sc.defaultParallelism     = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)
sc.defaultParallelismとsc.defaultMinPartitionsが最終的に確認するとrddのパーティション数を推定できます.

rddを生成するいくつかの方法:


1、scala集合方式parallelizeによりrddを生成する
たとえばval rdd=sc.parallelize(1 to 10)
このようにparallelize操作時にパーティション数が指定されていない場合、
rddのパーティション数=sc.defaultParallelism
2.textFile方式で生成されたrdd
たとえば、val rdd=sc.textFile(「path/file」)
次の2つのケースがあります.
a、ローカルファイルfile://から生成されたrdd.操作時にパーティション数が指定されていない場合、デフォルトのパーティション数規則は以下の通りである.
rddのパーティション数=max(ローカルfileのスライス数、sc.defaultMinPartitions)
b、hdfs分散ファイルシステムhdfs://から生成されたrdd.操作時にパーティション数が指定されていない場合、デフォルトのパーティション数規則は以下の通りである.
rddのパーティション数=max(hdfsファイルのblock数、sc.defaultMinPartitions)
3.HBAseのデータテーブルからRDDに変換すると、そのRDDのパーティション数はこのTableのregion数となる.
String tableName ="pic_test2";
conf.set(TableInputFormat.INPUT_TABLE,tableName);
conf.set(TableInputFormat.SCAN,convertScanToString(scan));
JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,TableInputFormat.class,ImmutableBytesWritable.class,Result.class);
Hbase Table:pic_test2 region 10, hBaseRDD 10。

4、json(またはparquetなど)ファイルを取得してDataFrameに変換すると、そのDataFrameのパーティション数は、ファイルシステムに格納されているBlockの数に対応する.
Dataset df = spark.read().json("examples/src/main/resources/people.json");
people.json 300M, HDFS 2 blocks, DataFrame df 2。

5、Spark Streaming取得Kafkaメッセージに対応するパーティション数
a.Receiverによるデータ受信
Receiver方式ではSparkのpartitionとkafkaのpartitionは関連していないのでtopicあたりのpartition数を増やすと
単一Receiverによって消費されるトピックを処理するためにスレッドを追加するだけです.しかし,これは処理データ上のSparkの並列度を増加させるものではない.
b、direct直結方式によるkafkaデータの読み出し
Sparkは、Kafka partitionと同様に多くのRDD partitionを作成し、Kafkaからデータを並列に読み込みます.
したがって,Kafka partitionとRDD partitionの間には,1対1のマッピング関係がある.