AirflowやCloud Composerでpackageを定義して使いたい


背景

AirflowやCloud Composerで自分で定義したpackageを使いたい場合。例えば、slack通知を行うクラスの定義など。

Airflowのやり方

例としてslack通知を実装する(参考: Airflowのタスク/DAGが失敗した時にSlackで通知する仕組み)。
まずは、Airflowでは以下のように定義すればいい。

dir構成


ProjectRoot
.
├── dags
│   ├── slack_test.py
│   ├── packages # 任意のdir名
│   │   ├── __init__.py
│   │   └── slack.py

slack.py


import json
import requests


class Slack:
    def noti(context=""):
        Slack.post('Failed: {}.{}.{}'.format(
            context['dag'], context['task'], context['execution_date'])
        )

    def post(text):
        url = "https://hooks.slack.com/services/__/__/__"
        requests.post(url, data=json.dumps({
            'username': 'Airflow',
            'channel': 'your_channel',
            'attachments': [{
                'title': text,
                "color": 'danger'
            }]
        }))

__init__.py

from packages.slack import Slack

slack_test.py


from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from packages import Slack


def test():
    Slack.post('slack test')
    return 'success'


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 8, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "on_failure_callback": Slack.noti, # Dagが失敗したら通知が飛ぶ
}

dag = DAG("slack_test", default_args=default_args, schedule_interval=timedelta(1))
t1 = PythonOperator(task_id='print_package', python_callable=test, dag=dag)

じゃ、Cloud Composerの場合は?

ローカルの Python ライブラリをインストールする

ここにあるように、ディレクトリ名を dependencies と定義する必要がある。
ディレクトリ構成は以下のようになる。


ProjectRoot
.
├── dags
│   ├── slack_test.py
│   ├── dependecies # これ
│   │   ├── __init__.py
│   │   └── slack.py