Spark RDDのデフォルトパーティション数:(spark 2.1.0)
简书-私は亮兄です:
この文書はSpark 2.1.0バージョンに基づいています
初心者はまずいくつかの構成を理解しなければなりません.
spark.default.parallelism:(デフォルトのコンカレント数)
1.ローカルモード
(executorは起動せず、SparkSubmitプロセスによって指定数のスレッド数を生成して同時実行する):spark-shell spark.default.parallelism = 1 spark-shell –master local[N] spark.default.parallelism=N(N個の核を用いる)spark-shell–master local spark.default.parallelism = 1
2.擬似クラスタモード
(xは本機で起動するexecutor数、yはexecutor毎に使用するcore数、zはexecutor毎に使用するメモリ)spark-shell–master local-cluster[x,y,z]spark.default.parallelism = x * y
3.mesos細粒度モード
Mesos fine grained mode spark.default.parallelism = 8
4.その他のモード(ここでは主にyarnモードを指すが、もちろんstandaloneも同様)
Others: total number of cores on all executor nodes or 2, whichever is larger spark.default.parallelism=max(すべてのexecutorで使用されるcoreの総数、2)
上のルールでsparkを特定できます.default.parallelismのデフォルト値(プロファイルspark-default.confに表示されない構成が前提で、構成されている場合spark.default.parallelism=構成の値)
もう一つの構成が重要ですfiles.maxPartitionBytes=128 M(デフォルト)
The maximum number of bytes to pack into a single partition when reading files.
rddの1つのパーティションがデータを格納できる最大バイト数を表し、400 mのファイルが2つの領域に分かれている場合、actionでエラーが発生します.
1つのsparkアプリケーションが実行するとsparkが生成される.contextは、上記のsparkから得る2つのパラメータを同時に生成する.default.parallelismはこの2つのパラメータの値を導出した.
sc.defaultParallelism = spark.default.parallelism sc.defaultMinPartitions = min(spark.default.parallelism,2)
sc.defaultParallelismとsc.defaultMinPartitionsが最終的に確認するとrddのパーティション数を推定できます.
rddを生成する方法は2つある:1.scala集合方式parallelizeによりrddを生成する.
たとえば、val rdd=sc.parallelize(1 to 10)のように、parallelize操作時にパーティション数が指定されていない場合、rddのパーティション数=sc.defaultParallelism
2.textFile方式で生成するrddは、
たとえば、val rdd=sc.textFile(「path/file」)には、a、ローカルファイルfile://から生成されたrdd、操作時にパーティション数が指定されていない場合、デフォルトのパーティション数ルールは、(公式サイトの説明によると、ローカルfileのスライスルールは、hdfsのblockサイズで区切るべきだが、実測の結果、32 Mでスライスするのは固定されており、バグかもしれないが、sparkはすべてのhadoopインタフェースでサポートされているストレージシステムを使用できるため、spark textFileはhadoopインタフェースを使用してローカルファイルにアクセスする場合とhdfsにアクセスする場合では異なる)rddのパーティション数=maxである.(ローカルfileのスライス数、sc.defaultMinPartitions)b、hdfs分散ファイルシステムhdfs://から生成されたrdd、操作時にパーティション数が指定されていない場合、デフォルトのパーティション数ルールは、rddのパーティション数=max(hdfsファイルのblock数、sc.defaultMinPartitions)
補足:
1、HBAseのデータテーブルからRDDに変換する場合、このRDDのパーティション数はTableのregion数となる.
String tableName =”pic_test2”; conf.set(TableInputFormat.INPUT_TABLE,tableName); conf.set(TableInputFormat.SCAN,convertScanToString(scan)); JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class); Hbase Table:pic_test 2のregionが10であればhBaseRDDのパーティション数も10である.
2.json(またはparquetなど)ファイルを取得してDataFrameに変換する場合、そのDataFrameのパーティション数は、ファイルシステムに格納されているBlockの数に対応する.
Dataset df = spark.read().json(“examples/src/main/resources/people.json”); people.jsonサイズが300 MでHDFSに2ブロックを占有している場合、このDataFramedfパーティション数は2となる.
3,Spark StreamingはKafkaメッセージに対応するパーティション数を取得し,本明細書では論じない.
この文書はSpark 2.1.0バージョンに基づいています
初心者はまずいくつかの構成を理解しなければなりません.
spark.default.parallelism:(デフォルトのコンカレント数)
spark-default.conf , :
1.ローカルモード
(executorは起動せず、SparkSubmitプロセスによって指定数のスレッド数を生成して同時実行する):spark-shell spark.default.parallelism = 1 spark-shell –master local[N] spark.default.parallelism=N(N個の核を用いる)spark-shell–master local spark.default.parallelism = 1
2.擬似クラスタモード
(xは本機で起動するexecutor数、yはexecutor毎に使用するcore数、zはexecutor毎に使用するメモリ)spark-shell–master local-cluster[x,y,z]spark.default.parallelism = x * y
3.mesos細粒度モード
Mesos fine grained mode spark.default.parallelism = 8
4.その他のモード(ここでは主にyarnモードを指すが、もちろんstandaloneも同様)
Others: total number of cores on all executor nodes or 2, whichever is larger spark.default.parallelism=max(すべてのexecutorで使用されるcoreの総数、2)
上のルールでsparkを特定できます.default.parallelismのデフォルト値(プロファイルspark-default.confに表示されない構成が前提で、構成されている場合spark.default.parallelism=構成の値)
もう一つの構成が重要ですfiles.maxPartitionBytes=128 M(デフォルト)
The maximum number of bytes to pack into a single partition when reading files.
rddの1つのパーティションがデータを格納できる最大バイト数を表し、400 mのファイルが2つの領域に分かれている場合、actionでエラーが発生します.
1つのsparkアプリケーションが実行するとsparkが生成される.contextは、上記のsparkから得る2つのパラメータを同時に生成する.default.parallelismはこの2つのパラメータの値を導出した.
sc.defaultParallelism = spark.default.parallelism sc.defaultMinPartitions = min(spark.default.parallelism,2)
sc.defaultParallelismとsc.defaultMinPartitionsが最終的に確認するとrddのパーティション数を推定できます.
rddを生成する方法は2つある:1.scala集合方式parallelizeによりrddを生成する.
たとえば、val rdd=sc.parallelize(1 to 10)のように、parallelize操作時にパーティション数が指定されていない場合、rddのパーティション数=sc.defaultParallelism
2.textFile方式で生成するrddは、
たとえば、val rdd=sc.textFile(「path/file」)には、a、ローカルファイルfile://から生成されたrdd、操作時にパーティション数が指定されていない場合、デフォルトのパーティション数ルールは、(公式サイトの説明によると、ローカルfileのスライスルールは、hdfsのblockサイズで区切るべきだが、実測の結果、32 Mでスライスするのは固定されており、バグかもしれないが、sparkはすべてのhadoopインタフェースでサポートされているストレージシステムを使用できるため、spark textFileはhadoopインタフェースを使用してローカルファイルにアクセスする場合とhdfsにアクセスする場合では異なる)rddのパーティション数=maxである.(ローカルfileのスライス数、sc.defaultMinPartitions)b、hdfs分散ファイルシステムhdfs://から生成されたrdd、操作時にパーティション数が指定されていない場合、デフォルトのパーティション数ルールは、rddのパーティション数=max(hdfsファイルのblock数、sc.defaultMinPartitions)
補足:
1、HBAseのデータテーブルからRDDに変換する場合、このRDDのパーティション数はTableのregion数となる.
String tableName =”pic_test2”; conf.set(TableInputFormat.INPUT_TABLE,tableName); conf.set(TableInputFormat.SCAN,convertScanToString(scan)); JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class); Hbase Table:pic_test 2のregionが10であればhBaseRDDのパーティション数も10である.
2.json(またはparquetなど)ファイルを取得してDataFrameに変換する場合、そのDataFrameのパーティション数は、ファイルシステムに格納されているBlockの数に対応する.
Dataset df = spark.read().json(“examples/src/main/resources/people.json”); people.jsonサイズが300 MでHDFSに2ブロックを占有している場合、このDataFramedfパーティション数は2となる.
3,Spark StreamingはKafkaメッセージに対応するパーティション数を取得し,本明細書では論じない.