Airflowのwebserverと、読み込みに時間がかかるDAGの話


クリスマスも近いので、Airflowのwebserverの話をしましょう。

言いたいこと

  • 読み込みに時間がかかるDAGがあると
    • そのDAGが読み込めないかもよ
    • Airflowの管理画面にアクセス出来ないかもよ
  • タスク数だけなら割と多くても大丈夫だよ
    • Componserのデフォルト設定でも、千程度はいける?
  • 未来は明るいよ

Airflowを構成するモジュール

今回の主役のwebserverは、Airflowのモジュールの一つです。
他のモジュールには、

  • scheduler
    • DAG/Taskのスケジューリング・状態の管理
  • worker
    • タスクインスタンスを実際に実行
  • database
    • TaskInstance・DagRun・xcom・Variableなどの保持
  • executor
    • スケジュールされたタスクをworkerに配分

などがあります。
詳しくは、Astronomerさんの記事がわかりやすいです。

webserverとは

webserverは、管理画面(下図)・CLIコマンド(の一部)APIなどを担当するモジュールで、

  • DAG/task instanceなどの状況把握
  • DAG/task instanceの再実行や、ステータスの変更

などの処理を受け付けます。

内部的には、Flask+Gunicornの構成となっており、画面からのエンドポイントはここらへんに定義されています。


(図はAirflow公式ページより)

読み込みに時間がかかるDAG問題

webserverは、リクエストを受け付けるだけではなく、DAGファイルを定期的に読み込みます。

その結果、DAGファイルの読み込みに時間がかかると、

  • 該当のDAGが読み込めない
  • webserverへのアクセスが重くなる
  • アクセスすら出来ない(エラーページ表示)

ことがあり、

などでも注意喚起?されています。

読み込みに時間がかかる?

DAGの読み込みに時間がかかるのと、DAGRunの実行に時間がかかるのは、混同しそうですが別の話で、今回問題にしているのは前者です。

例を出すと、これは読み込みに時間がかかるDAGで、

    sleep(10000000)
    start = DummyOperator(task_id='start')

これはDAGRunの実行に時間がかかるDAGです。

        def hoge():
            sleep(1000000)
        slow_task = PythonOperator(
            task_id='query_' + str(i),
            python_callable=hoge,
        )

タスク数が多かったり、タスクの外で外部にアクセスしていると、読み込みが遅くなる可能性があります。

webserverがDAGをパースする流れ

細かい流れが気になる人向けに:

  1. webserver起動時に、定期的に(※)子プロセス(gunicorn worker)を再起動するように設定
  2. エンドポイントのファイルのロード時にDagBagオブジェクトが作られる
  3. DagBagオブジェクトが作られる中で、DAGファイルがパースされる

※ 具体的にはworker_refresh_interval秒

タスク数との関係

Cloud Composer(Airflow 1.10.2)・BigQueryOperatorのみのDAGで試したところ:

  • DAGあたりに1000タスクくらいでは、デフォルト設定でも表示が出来る
  • DAGあたりに3000タスクくらいになると、デフォルト設定では表示ができなくなる
    • webserverが重くてもSchedulerやWorkerは動きます(Stackdriverで確認)
    • タイムアウト(worker_refresh_interval)を伸ばしたり、読み込みを非同期(async_dagbag_loader)にするといける

なお、 Graph ViewやTree Viewだけが重い時は、default_dag_run_display_numberを変えるといいらしいです。

明るい未来の話

この「読み込みに時間がかかるDAG」に関しては、改善がいくつか提案されています。

Cloud Composerでは、webserver上のDAGの読み込みを非同期にするオプションが実装されており、Airflow1.10.4にも移植されています

まだドラフトですが、AIP-24 DAG Persistence in DB using JSON for Airflow Webserver and (optional) Schedulerという提案は、より大幅な変更で、

  • webserverでのDAGのパースはやめる
  • schedulerがDAGをパースし、シリアライズ結果をDBに入れる
  • webserverは、その結果を使う

オプションを提案しています。
(webserverが状態を持っているのが、そもそも良くないよねという話もあるらしい

Cloud Compserのwebserver

Cloud Composerのwebserverに関してのメモです:

ちなみに、Astronomer.ioの方はvCPU・メモリのサイズを変えられます。