AWSブログのEMRでSparkを使う際のベストプラクティスを試してみる


概要

先日、AWSブログで「Amazon EMR で Apache Spark アプリケーションのメモリをうまく管理するためのベストプラクティス」という記事が公開されました。
恥ずかしながら、普段、EMRの設定はmaximizeResourceAllocationを利用しており、あまりしっかりとやったことがなかったので、こちらの内容を参照し、具体的にどの値がどのように変わるのか、またどのくらい効果がありそうなのかを確認しました。

検証

検証内容

同じ構成のEMRクラスタにおいて、maximizeResourceAllocationを有効にした場合とベストプラクティスの方針を実施した場合とで、リソースに関連する各設定値がどのように変わるかを比較する。

それぞれの方針で設定したEMRを、便宜上maximizeResourceAllocationベストプラクティスと呼ぶこととします。

maximizeResourceAllocationについて

このオプションは、AWSが提供しているEMR立ち上げ時のオプションで、awsの公式では、以下のようにこのオプションを説明しています。

クラスターの作成時に、spark 設定分類を使用して maximizeResourceAllocation オプションを true にすることで、クラスター内の各ノードでリソース最大限に使用できるようにエグゼキュターを設定することができます。この EMR 固有のオプションは、コアインスタンスグループのインスタンスのエグゼキューターで利用可能な最大のコンピューティングとメモリリソースを計算します。次に、この情報に基づいて対応する spark-defaults が設定されます。

一方、件のAWSブログでは、maximizeResourceAllocationによる設定は不十分であると主張し、ベストプラクティスによる設定方法を記載しています。

この設定を使っても、ほとんどの場合デフォルトの数は少なく、アプリケーションはクラスターの力を完全に使用しません。例えば、並列性は規模が大きいクラスターのために高くできるにもかかわらず、spark.default.parallelism のデフォルトは利用可能な仮想コアの数の 2 倍に留まります。

検証用クラスタ

  • master node
    • r4.4xlarge
      • vCPU: 16
      • Memory: 122GB
  • core node
    • r4.4xlarge * 5

リソース関連の設定値の比較

設定 maximizeResourceAllocation ベストプラクティス
spark.executor.cores 16 5
spark.executor.memory 105309M(=103G) 39G
spark.executor.memoryOverhead1 10530M(=10G)
(default: executor memory * 0.1)
5G
spark.driver.memory 106124M(=103G) 39G
spark.driver.cores 1
(default)
5
spark.executor.instances 5 14
spark.default.parallelism 160 140
spark.sql.shuffle.partitions 200 140

実行の比較

各方針で立ち上げたEMRクラスタに対して、同じSparkプログラムをstepにより実行しました。

元々maximizeResourceAllocationを設定していたクラスタでは、メモリエラーが発生していたケースでも、ベストプラクティスでは記事の主旨通り正常終了を確認できました。

また、インスタンスの数・性能を上げて、元々正常終了する構成においても、ベストプラクティスを実施することで実行時間は60~65%ほどになりました。

それぞれConfigureへの設定は、以下のようにしています。

maximizeResourceAllocation有効Configure
[
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "true"
    }
  }
]
ベストプラクティスConfigure
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.driver.memory": "39G",
      "spark.driver.cores": "5",
      "spark.executor.memory": "39G",
      "spark.executor.cores": "5",
      "spark.executor.instances": "14",
      "spark.executor.memoryOverhead": "5G",
      "spark.driver.memoryOverhead": "5G",
      "spark.default.parallelism": "140",
      "spark.sql.shuffle.partitions": "140",

      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    }
  }
]

考察

maximizeResourceAllocationを有効にした場合と比較して、ベストプラクティスの設定値は、エグゼキュータ数(spark.executor.instances)を多くして並列性を上げる分、各エグゼキュータで使用するコア数(spark.executor.cores)とメモリ(spark.executor.memory)が減っていることがわかります。一般に、並列度を上げるとメモリエラーが起こりやすくなるという認識ですが、今回の検証ではエラーも回避できました。また、並列数が上がったためか、一定の高速化効果も確認できました。

ベストプラクティスの方針は、並列数を適切にした上でそれぞれに割り当てるメモリを決定しています。そのため、クラスタに対して過剰なデータを扱わなければあまり問題は起きなさそうです。実行するプログラムと利用するクラスタ構成に応じて最適ではないケースもあるかと思いますが、取っ掛かりとして使う設定としては十分だと思います。

これらの設定値による影響の確認は今回できてません。記事内で推奨されているcompress周りのオプションやGC等々の設定による影響も気になります。これらの値の変化に対して、Gangliaを使ってモニタリングしてみるのも面白そうです。

参考


  1. コメントより、spark.yarn.executor.memroyOverheadは将来的にspark.executor.memoryOverheadとのことです。