【Airflow入門】AirflowのJobを別の強いノードで処理させたい


はじめに

Airflowはワークフローエンジンと呼ばれるもので、DAGと呼ばれる有向グラフで
Jobスケジュールを管理します。
以下に基本構成を載せます。

基本構成

本題

特定のJobを強いGPUを積んだworkerNodeに仕事をさせたい

そう思うことってありますよね。特に機械学習Jobを回すときなどはこれが必要になります。
どうやって実現するかというと、Celery Executorというのを使用します。

Celery

Celeryとは、もともとJobを分散処理をするためのミドルウェアで、
これをAirflowはいい感じに使ってくれます。そのいい感じに使うのを応用して、特定ノードに仕事をさせます。

Celery Executor 概念編

AirflowにはCeleryをうまく使うためのCelery Executorというのがあります。
基本的な概念を以下に説明します。

基本的に、WorkerNodeは複数あると想定されています。
今回はGPUインスタンス1個だけを接続するのと、MasterNode内にもWorkerがいる設定でお話します。

SchedulerとCeleryExecutorは以下のように動きます

  1. SchedulerからJobの司令書がQueueに発行されます
  2. CeleryはQueueに接続しているWorkerに無差別にJobを投げていきます。
  3. WorkerはDequeueして手元にあるDagFileを元にJobを実行するとともに、結果をPostgreSQLに記載していきます。

この時、特に設定をしてない場合、WorkerNodeのWorkerにJobが投げられるのか、
MasterNodeのWorkerに投げられるのかは、決めることができません。

しかし、特定のQueueにWorkerを接続するようにさせると、
その特定のQueueに流れるJobは接続したWorkerにしか行かないので、
特定のNodeに特定の作業をさせることができます。

Celery Executor 設定編

まず、MasterNodeとは別のNodeでJobを動かすためにはCeleryExecutor機能が必須になります。
そのために、以下のパッケージをインストールしてください。

celeryインストール
pip install apache-airflow[celery]==1.10.7

versionはお好きに変更して大丈夫です。(執筆現在では1.10.7が最新安定版)

Celery Executorはairflow.cfgファイルから設定する必要があります。

airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
#executor = SequentialExecutor
executor = CeleryExecutor

また同様に以下の項目も編集します。

airflow.cfg
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = redis://user:password@hostname:port/0 #queueを複数指定しているように見えますが、brokerは一つで大丈夫です。

# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
#result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+postgresql://airflow:password@hostname:port/airflow #ここにはSchedulerやWebServerが見てるDBを参照させる

これを、WorkerNode、MasterNodeの両方に設置します。
githubなどで共有すると良いでしょう。

Celery Executor 実行編

いよいよ実行してみましょう。
MasterNode側でairflow webserverairflow schedulerをTmuxなりを使って起動させます。
そして、WorkerNode側でairflow worker -q MLとしてQueueNameにMLと指定します。

現状の図

現在の状況としてはこのような図になります。
ここで、DAG側でQueueにMLを指定したDAGを流してやれば、無事動きます。
当然、Queueにdefaultをした場合、MasterNodeでJobを実行してくれます。

おわりに

GCP関連について今度は書いてみたいですね、とりあえず勉強したことをそのまま書いている感じなので、
体系的になってないかもしれませんが、そこらへんは今度個人ブログでまとめようかと思いますのでよろしくお願いします!