spark1.6.1学習ノート02-sparkクラスタのジョブスケジューリング


1、sparkアプリケーション間のスケジューリング
各sparkアプリケーションには、sparkアプリケーションのtasksのみを実行する独立したexecutor仮想マシンセットがあります.
sparkは、複数のクラスタリソースの割り当て方法を提供します.
(1)最も簡単な方法は静的リソース割り当てである.このモードは、各sparkアプリケーションに静的最大リソース量を割り当て、sparkアプリケーションのライフサイクル全体でこれらのリソースを保持します.spark standalone、YARN、coase-graned Mesosクラスタは、このリソース割り当てモードを提供します.
(2)次に,Mesosが提供するcpuカーネル数に基づく動的共有方式である.このように、各sparkアプリケーションには固定された独立したメモリ(spark.executor.memoryで指定)がありますが、あるアプリケーションがタスクを実行していない場合、他のアプリケーションはそのアプリケーションのカーネルを使用してタスクを実行できます.このモードを使用するにはmesos://のURLを使用し、sparkを使用する必要があります.mesos.coarseをfalseに設定します.
現在、すべてのモードでは、アプリケーション間のメモリ共有機能は提供されていません.
動的リソース割当
sparkは、ワークロードベースの動的リソース割り当てを提供します.このプロパティはデフォルトでは禁止されていますが、すべてのcoarse-grainedのクラスタマネージャ(standalone mode,YARN mode、Mesos coarse-grained mode)で使用できます.
このプロパティを使用するには、次の2つの条件を満たす必要があります.
(1)sparkを設定.dynamicAllocation.enabledはtrueです.
(2)worker nodeごとにexternal shuffle serviceを起動しspark.shuffle.service.enabledはtrueに設定されています.このexternal shuffleサービスは、executorsが削除されたときにshuffleファイルを削除しないようにする役割を果たします.このサービスを開始する方法は、クラスタマネージャによって異なります.
Standaloneではspark.shuffle.service.enabledをtrueに設定します.
Mesos coarse-grainedモードでは、slaveノード毎に$SPARK_HOME/sbin/start-mesos-shuffle-service.shを実行する、sparkを実行する必要がある.shuffle.service.enabledはtrueに設定されています.この操作はMarathonで実現できます.
YARNでNodeManagerごとにshuffleサービスを開始する方法は次のとおりです.
<1>YARN profileを使用してSparkをコンパイルし、パッケージ化されたプログラムを使用するとこのステップをスキップできます.
<2>spark--yarn-shuffleを見つけた.jarファイルは、自分でsparkをコンパイルする場合は$SPARK_に配置する必要があります.HOME/network/yarn/target/scala-中;デプロイパッケージを使用する場合は、libに配置する必要があります.
<3>このjarパッケージをNodeManagerのclasspathに追加します.
<4>各ノードのyarn-site.xmlでyarnを設定.nodemanager.aux-servicesはspark_shuffle,orgを設定.apache.spark.network.yarn.YarnShuffleServiceはyarnです.nodemanager.aux-services.spark_shuffle.class.
<5>すべてのNodeManagerを再起動します.
資源配分の原則
要求リソースの原則
蓄積されたタスクが割り当てられる必要がある場合、sparkアプリケーションはクラスタマネージャにリソースを要求します.
タスクが一定時間t 1(パラメータspark.dynamicAllocation.schedulerBacklogTimeoutによって構成される)蓄積されると、sparkアプリケーションはリソースの要求を開始する.
次に、以降、パラメータspark.dynamicAllocation.sustainedSchedulerBacklogTimeoutによって構成される時間t 2毎に、sparkアプリケーションは、タスク蓄積がまだ存在するか否かを判断する.まだ存在する場合は、リソースのリクエストを続行する必要があります.
sparkアプリケーションは、要求リソースを指数関数的に増加させるたびに、例えば、1回目の要求に1つのexecutorを増加させ、2回目は2つのexecutor、次いで4つ、8つになる.
リソースの除去の原則
あるsparkアプリケーションに空きexecutorが存在し、一定時間t 3を超える(spark.dynamicAllocation.executorIdleTimeoutによって指定される).
エレガントにexecutorを解除
動的リソース割り当てを使用しない場合、sparkのexecutorは「エラー」または「関連するsparkアプリケーションの終了」の場合にのみ終了します.
動的リソース割り当てを行う場合、executorは終了時にsparkアプリケーションがまだ実行中です.したがってsparkは、executorが終了する前に関連する動作状態を優雅に保存する必要がある(後でexecutorを再起動できるようにする).
この操作はshuffleにとって特に重要です.shuffle操作では、spark executorがmapの結果をローカルハードディスクに書き込むと、executorはサーバとしてこれらのデータを必要とする他のexecutorにキャプチャサービスを提供します.ホームレスタスク(タスクの実行時間が他のタスクよりはるかに長い)が存在する場合、動的割り当てはshuffle操作が終了する前にexecutorを削除します.これにより、削除されたexecutorのshuffleファイルが再計算される必要があります(これらのファイルはexecutorの削除に伴ってホームレスタスクを実行するexecutorには到達しないため).
この問題を解決する方法は、spark 1にある外部のshuffleサービスを使用することである.2リリース.このサービスは、クラスタ内の各ノードで実行され、sparkアプリケーションおよびそれらのexecutorとは独立した長期的な実行プロセスである.このサービスがオンになると、shuffleファイルはこのサービスから取得できます.この場合、あるexecutorのshuffleファイルはexecutorのライフサイクル以外でも使用できます.
shuffleファイルに加えて、executorはハードディスクまたはメモリにデータをキャッシュします.executorが削除されると、キャッシュされたデータは達成できません.spark 1.2この問題はまだ解決されていない.以降のバージョンでは、キャッシュされたデータはスタック外メモリに格納されます.
2、sparkアプリケーション内部のスケジューリング
jobとは、1つのspark actionおよび他のタスクがactionを完了するために実行する必要があるタスクを指す.sparkアプリケーションでは、異なるスレッドでコミットされたjobが同時に実行されます.sparkのジョブスケジューリングはスレッドが安全であり,マルチリクエストの応用をサポートする.
デフォルトではspark jobスケジューリングは先進的な先頭に従います.各jobは、mapおよびreduce操作などの多くのstagesに分解され、最初のjobの優先度は2番目より高く、順次類推される.jobキューの最初のいくつかのjobがクラスタ全体を使用する必要がない場合、次のjobsはすぐに実行を開始することができます.
spark 0.8からjobs間で公平な共有を構成するスケジューリング(fair sharing)をサポートします.このモードではsparkは「round-robin」方式でタスクを割り当て、すべてのjobがクラスタのリソースを共有します.これは、大きなjobの実行時にコミットされた小さなjobもすぐにリソースに割り当てられ、すぐに応答することを意味します.このモードは、マルチユーザ設定に特に適しています.
公平なスケジューリングを開くにはsparkを設定するだけです.scheduler.modeはFAIR:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

3、公平なスケジューリングプール(Fair Scheduling Pools)
フェアスケジューリングはjobを異なるプールに分類し、異なるプールに対して異なるスケジューリングオプションを設定することをサポートします.これにより、優先度の高いプールに重要なjobを実行させることができる.もう1つの使い方は、異なるユーザーのjobを分類し、すべてのユーザーに対して公平な優先度を行い、誰がjobを提出しても、誰がjobを提出しても少ないことです.この方法のインスピレーションはhadoop fair schedulerから来た.
新しくコミットjobはデフォルトのプールに入りますが、SparkContextでローカル属性sparkを設定できます.scheduler.poolパラメータは、コミットされたスケジューリングプールを指定します.
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
以降、スレッドがコミットしたjobは、このスケジューリングプールを使用します.
スレッドプールの設定を解除する方法は、次のとおりです.
sc.setLocalProperty("spark.scheduler.pool", null)

4、スケジューリングプールのデフォルト属性
デフォルトでは、各スレッドプールは平等な共有クラスタ(デフォルトプールのjobも平等にそのプールのリソースを共有する)であるが、残りの各スケジューリングプールではjobが先に出力される.
5、スケジューリングプールの構成
スケジューリングプールでは、次の3つのパラメータがサポートされています.
1、schedulingMode.FIFOまたはFAIRであってもよい.
2、weight.クラスタリソースを共有するときのスケジューリングプールの重みを制御します.デフォルトでは、各スケジューリングプールのweightは1です.プールのweightを2に指定すると、他のスケジューリングプールの2倍のリソースが得られます.
3、minShare.最小共有リソース数(CPUコア数)を指定します.フェアスケジューリングは、常に各スケジューリングプールに最小共有リソース数で指定されたリソースを提供しようとします.次に、残りのリソースを重みweightで割り当てます.minShareのデフォルト値は0です.
スケジューリングプールのプロパティは、$SPARK_と同様にXMLファイルで設定できます.HOME/conf/fairscheduler.xml.template.次にSparkContextにsparkを設定.scheduler.allocation.fileプロパティ:
conf.set("spark.scheduler.allocation.file", "/path/to/file")
XMLドキュメントの形式は次のとおりです.


  
    FAIR
    1
    2
  
  
    FIFO
    2
    3