Apache Airflowのジョブの失敗をTeamsに通知する


はじめに

Apache Airflowにはタスクが失敗した場合に呼び出すことができる on_failure_callback という仕組みがある。これを使ってMicrosoft Teamsの特定のチャネルにアラートをPOSTする仕組みを作成する。

Teamsへの通知の仕組み

TeamsにメッセージをPOSTする処理は Logic App で作成し、それを呼び出すようにAirflow側で実装するのが便利そうだ。今回はTeamsにPOSTするだけの処理にしたが、これ以外の通知方法(メールなど)や、自動化処理も必要に応じて簡単に追加できるので、この部分をLogic Appに任せるというのは良いアイデアのように思う。

設定

Logic Appの設定

まずLogic Appを設定する。HTTPリクエストを受信した際に起動するようにトリガーをセットし、TeamsにメッセージをPostするというシンプルなワークフローを定義する。

HTTPリクエスト受信の設定

HTTPリクエストの受信の際に、Request Bodyとして受信するJSONスキーマを定義する。ここではAirflowから渡すことができるContextの一部の値を受け取ることを想定して3つのフィールドを定義している。

定義のJSONのTextは以下の通り。

{
    "properties": {
        "ExecDate": {
            "type": "string"
        },
        "RunID": {
            "type": "string"
        },
        "TaskInstance": {
            "type": "string"
        }
    },
    "type": "object"
}

Teamsへの通知の設定

次にTeamsの通知の設定をする。便利なことにLogic AppにはすでにTeamsを操作するための部品が用意されている。その中から今回は Post a message (V3) (Preview) を選択。
Teamを選択し、Post先のChannelを選択し、Message欄に通知したい内容を記載する。HTTP request bodyで受け取る項目を使用して下図のように組み立てた。

通知テスト用のDAGを作成

default_argson_failure_callback を指定すると、どのタスクでも失敗した場合に通知できるようにできる。post_teams_channel という関数を作成して、on_failure_callback が発火したときに呼べるようにした。ここでは成功するタスク success と失敗するタスク failure を両方動かして挙動を調べる。

import airflow
import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# Logic AppにHTTP POSTリクエストを送信する。ContextからDAGの情報をrequest bodyに渡す。
def post_teams_channel(context):
    url = "https://<Logic App URL>"
    headers = {'content-type': 'application/json'}
    payload={
       'TaskInstance': str(context['task_instance_key_str']),
       'RunID': str(context['run_id']),
       'ExecDate': str(context['execution_date'])
    }
    r = requests.post(url, headers=headers, json=payload)

# on_failure_callbackをここに実装
args = {
    "owner": "airflow",
    "email": ["[email protected]"],
    "depends_on_past": False,
    "on_failure_callback": post_teams_channel,
    "start_date": airflow.utils.dates.days_ago(0)
}

# DAGの作成
dag = DAG(dag_id="teams-notify",
        default_args=args,
        schedule_interval="@daily")

# 成功するタスク "success"
t1 = BashOperator(
    task_id='success',
    bash_command='exit 0',
    dag=dag,
)

# 失敗するタスク "failure"
t2 = BashOperator(
    task_id='failure',
    bash_command='exit 1',
    dag=dag,
)

# タスクの依存関係定義
t1 >> t2

通知テスト

準備が完了したので実際にDAGをトリガーして動作をテストしてみる。以下のコマンドを実行して直ちにトリガーする。

airflow dags trigger teams-notify

しばらくすると task_id : success が成功し、task_id : failure が失敗している結果が確認できる。

TeamsにはLogic Appで定義した内容でメッセージがPostされていることが確認できる。