【Airflow入門】AirflowのJobを別の強いノードで処理させたい
はじめに
Airflowはワークフローエンジンと呼ばれるもので、DAGと呼ばれる有向グラフで
Jobスケジュールを管理します。
以下に基本構成を載せます。
基本構成
本題
特定のJobを強いGPUを積んだworkerNodeに仕事をさせたい
そう思うことってありますよね。特に機械学習Jobを回すときなどはこれが必要になります。
どうやって実現するかというと、Celery Executorというのを使用します。
Celery
Celeryとは、もともとJobを分散処理をするためのミドルウェアで、
これをAirflowはいい感じに使ってくれます。そのいい感じに使うのを応用して、特定ノードに仕事をさせます。
Celery Executor 概念編
AirflowにはCeleryをうまく使うためのCelery Executorというのがあります。
基本的な概念を以下に説明します。
基本的に、WorkerNodeは複数あると想定されています。
今回はGPUインスタンス1個だけを接続するのと、MasterNode内にもWorkerがいる設定でお話します。
SchedulerとCeleryExecutorは以下のように動きます
- SchedulerからJobの司令書がQueueに発行されます
- CeleryはQueueに接続しているWorkerに無差別にJobを投げていきます。
- WorkerはDequeueして手元にあるDagFileを元にJobを実行するとともに、結果をPostgreSQLに記載していきます。
この時、特に設定をしてない場合、WorkerNodeのWorkerにJobが投げられるのか、
MasterNodeのWorkerに投げられるのかは、決めることができません。
しかし、特定のQueueにWorkerを接続するようにさせると、
その特定のQueueに流れるJobは接続したWorkerにしか行かないので、
特定のNodeに特定の作業をさせることができます。
Celery Executor 設定編
まず、MasterNodeとは別のNodeでJobを動かすためにはCeleryExecutor機能が必須になります。
そのために、以下のパッケージをインストールしてください。
pip install apache-airflow[celery]==1.10.7
versionはお好きに変更して大丈夫です。(執筆現在では1.10.7が最新安定版)
Celery Executorはairflow.cfg
ファイルから設定する必要があります。
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
#executor = SequentialExecutor
executor = CeleryExecutor
また同様に以下の項目も編集します。
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = redis://user:password@hostname:port/0 #queueを複数指定しているように見えますが、brokerは一つで大丈夫です。
# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
#result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+postgresql://airflow:password@hostname:port/airflow #ここにはSchedulerやWebServerが見てるDBを参照させる
これを、WorkerNode、MasterNodeの両方に設置します。
githubなどで共有すると良いでしょう。
Celery Executor 実行編
いよいよ実行してみましょう。
MasterNode側でairflow webserver
とairflow scheduler
をTmuxなりを使って起動させます。
そして、WorkerNode側でairflow worker -q ML
としてQueueNameにML
と指定します。
現状の図
現在の状況としてはこのような図になります。
ここで、DAG側でQueueにML
を指定したDAGを流してやれば、無事動きます。
当然、Queueにdefault
をした場合、MasterNodeでJobを実行してくれます。
おわりに
GCP関連について今度は書いてみたいですね、とりあえず勉強したことをそのまま書いている感じなので、
体系的になってないかもしれませんが、そこらへんは今度個人ブログでまとめようかと思いますのでよろしくお願いします!
Author And Source
この問題について(【Airflow入門】AirflowのJobを別の強いノードで処理させたい), 我々は、より多くの情報をここで見つけました https://qiita.com/Suguru_Toyohara/items/e0f218a6159fa1a3b99f著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .