AirflowでHTTPリクエスト送るシンプルなDAGのコードとはまったこと


背景

AirflowでHTTPリクエスト送るだけのシンプルなDAGを作りたい

コード

先に動くコードを出しておく。
肝は HttpOperator ではなく、 PythonOperator を使うところ。(詳しくは後述)

import airflow
import subprocess
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
import requests

def execute_request():
    URL = "http://hoge.com"
    headers={'Authorization': 'some_token'}
    r = requests.get(URL, headers=headers)
    if r.status_code != requests.codes.ok:
        r.raise_for_status()
    return 'success'

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5)
}

dag = DAG(
    "simple_request_dag",
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

PythonOperator(
    task_id='simple_request_task',
    python_callable=execute_request,
    dag=dag,
)

はまったこと

Airflow の reference とか見ながら、httpリクエストするOperator 探すと、まず HttpOperator にたどり着くと思う。が、このOperatorはちょっと厄介なので気をつけたほうがいい。シンプルにURLを渡してリクエストできるようにはなっていない。

HttpOperatorのサンプルはここにある。
airflow/airflow/example_dags/example_http_operator.py
このコードを見ると、hostの指定がないことに気づく。
HttpOperatorではコード上でhostの指定ができない。

このサンプルコードのhostはAirflowがもつConnectionsという変数に設定されている。 これは WebUIの admin -> Connections で確認できる。上記のサンプルコードの場合はデフォルトホストである、default_host=www.google.comというものが適用される。もし、このdefault以外のhostを指定したい場合は、Connectionsに hoge_host=http://hoge.com のようなものを登録し、 http_conn_id 引数で指定する必要がある。Connectionsの設定は、WebUIで行うか、環境変数でも可能。

でも面倒。

とうことで、シンプルに作りたい場合は、PythonOperatorがおすすめ。

参考

How to access the response from Airflow SimpleHttpOperator GET request

「HttpOperatorでどうやってhost変えるの? え、そんな面倒なの? じゃ、PythonOperator使うわ。」みたいなシュールなやり取りがされている。