AWS MWAA (Amazon Managed Workflows for Apache Airflow) にて、`airflow.exceptions.AirflowTaskTimeout: DagBag import timeout`のエラー


はじめに

AWS MWAA (Amazon Managed Workflows for Apache Airflow) にてワークフロー実行中に表題のエラーに遭遇したため、その解決法について簡潔にまとめます。

何が起きたか?

ワークフローを実行するとエラーが起き、タスクのエラーログには以下のように出ていました。

Executor reports task instance <TaskInstance: hoge_dag.hoge_task manual__yyyy-mm-ddTHH:MM:SS+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?

どうやらタスクはキューされているのに、実行できていないようです。これだけではエラーの原因がわからずですが、サーバーログの方に原因がきちんと出ていました。

以下ログ

airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for hoge_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#top-level-python-code
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#reducing-dag-complexity, PID: 909

ERROR - Failed to execute task Dag 'hoge_dag' could not be found; either it does not exist or it failed to parse..

DAGをimportしようとしたが、タイムアウトになってしまっているようですね。

原因

直接的な原因は上で記載した通りタイムアウトです。

以下のAirflowのBest Practicesが載っているページのReducing DAG complexityの項を見ると、タスクの数や分岐など、DAGの構造が複雑になるほどパフォーマンスに影響があることが分かります。
今回の僕のケースでは、分岐を多く含むDAGとして構築していたため、DAGをimportする時間が伸びてしまったと考えられました。

どう解決したか?

原因のところで述べたように、DAGの複雑性を低くするかタイムアウト時間を伸ばすことでエラーは解決します。
今回はタイムアウト時間を伸ばす方法をとります。
エラーログにも出ているように、デフォルトのタイムアウト設定値は30秒なので、これを伸ばしてやります。

タイムアウトの設定はcore.dagbag_import_timeoutの変数で設定されています。
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dagbag-import-timeout

MWAAの場合は、airflow.cfgが公開されておらず直接編集できないので、MWAAのリソースを作成する際に環境変数としてセットしてやる必要があります。

僕はCloud Formationにて作成していたため、template.yamlに以下のように追記しました。

  MwaaEnvironment:
    Type: AWS::MWAA::Environment
    Properties:
      
      
      AirflowConfigurationOptions:
        # dagの読み込みタイムアウト
        core.dagbag_import_timeout: 60