DAG間の依存関係を設定したDAGをつくる


追記(2020/06/16)

こちらに最新版のまとめを書きました↓

AirflowでDAG間の依存関係の作成方法のまとめ

==追記ここまで==

背景

DAG_A と DAG_B がある場合に、DAG_A が正常終了した後に、DAG_Bが実行されるような依存関係のあるDAGを作成したい。

サンプルコード

TriggerDagRunOperator を使う。

triggering.py

from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "retry_delay": timedelta(minutes=5)
}
dag = DAG("first_dag", default_args=default_args, catchup=False, schedule_interval="0 17 * * *")
task1 = BigQueryOperator(
    task_id="task1",
    sql="./sql/task1.sql",
    use_legacy_sql=False,
    destination_dataset_table="pj.dataset.table_name",
    write_disposition="WRITE_TRUNCATE",
    allow_large_results=True,
    dag=dag
)

trigger = TriggerDagRunOperator(
    task_id="trigger",
    trigger_dag_id="second_dag", 
    dag=dag,
)

task1 >> trigger
triggered.py

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG( "second_dag", default_args=default_args, schedule_interval=None)

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "success trigger"',
    schedule_interval=None,
    dag=dag,
)

trigger.pytask1 実行後、trigger タスクで triggered.pysecond_dag を実行している。
DAGをトリガーするには TriggerDagRunOperator を使う。

参考

github: airflow/airflow/example_dags/example_trigger_controller_dag.py