AirflowやCloud Composerでpackageを定義して使いたい
7122 ワード
背景
AirflowやCloud Composerで自分で定義したpackageを使いたい場合。例えば、slack通知を行うクラスの定義など。
Airflowのやり方
例としてslack通知を実装する(参考: Airflowのタスク/DAGが失敗した時にSlackで通知する仕組み)。
まずは、Airflowでは以下のように定義すればいい。
dir構成
ProjectRoot
.
├── dags
│ ├── slack_test.py
│ ├── packages # 任意のdir名
│ │ ├── __init__.py
│ │ └── slack.py
slack.py
import json
import requests
class Slack:
def noti(context=""):
Slack.post('Failed: {}.{}.{}'.format(
context['dag'], context['task'], context['execution_date'])
)
def post(text):
url = "https://hooks.slack.com/services/__/__/__"
requests.post(url, data=json.dumps({
'username': 'Airflow',
'channel': 'your_channel',
'attachments': [{
'title': text,
"color": 'danger'
}]
}))
__init__.py
from packages.slack import Slack
slack_test.py
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from packages import Slack
def test():
Slack.post('slack test')
return 'success'
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 8, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": Slack.noti, # Dagが失敗したら通知が飛ぶ
}
dag = DAG("slack_test", default_args=default_args, schedule_interval=timedelta(1))
t1 = PythonOperator(task_id='print_package', python_callable=test, dag=dag)
じゃ、Cloud Composerの場合は?
ローカルの Python ライブラリをインストールする
ここにあるように、ディレクトリ名を dependencies
と定義する必要がある。
ディレクトリ構成は以下のようになる。
ProjectRoot
.
├── dags
│ ├── slack_test.py
│ ├── dependecies # これ
│ │ ├── __init__.py
│ │ └── slack.py
Author And Source
この問題について(AirflowやCloud Composerでpackageを定義して使いたい), 我々は、より多くの情報をここで見つけました https://qiita.com/munaita_/items/feb6e0f690ba5a6b6086著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .