ワークフローairflowの紹介
apache airflowとは
- https://github.com/apache/incubator-airflow
- pythonでタスクとタスク依存関係を書く
- プラグインによる拡張が可能
- ワークフロー管理のためのUI
- ワークフローの可視化
- 実行の監視、ログの確認(原因究明が速い)
- 必要なタスクの再実行
apache airflowの特徴
- pythonコードなので、ソースはgitで管理できる
- 処理を実行するworker(Scaling Out with Celery)の分散が可能
- 色々プラグインが用意されている
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperatorなど
- gcp(storage, bigquery)が直接に使える
- その以外は https://github.com/apache/incubator-airflow/tree/master/airflow/contrib
- pythonによるプラグイン拡張ができる
apache airflowの特徴
- 一つworkflowは一つDAG(Directed Acyclic Graph)
- タスクの集合及び依存関係
- 有向非巡回グラフ, cycleを持たない有向グラフのこと
- タスクの依存関係はトポロジカルソート(topological sort)で解決できる
- タスク間にデータの受け渡しは(‘xcom’ )
- “pull” data from a second task (xcom_pull)
- “push” data from one task (xcom_push)
類似ツール(workflow engine)
- spotify luigi(python)
- treasuredata digdag(java)
- oozie
- ワークフローの可視化
- 実行の監視、ログの確認(原因究明が速い)
- 必要なタスクの再実行
- pythonコードなので、ソースはgitで管理できる
- 処理を実行するworker(Scaling Out with Celery)の分散が可能
- 色々プラグインが用意されている
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperatorなど
- gcp(storage, bigquery)が直接に使える
- その以外は https://github.com/apache/incubator-airflow/tree/master/airflow/contrib
- pythonによるプラグイン拡張ができる
apache airflowの特徴
- 一つworkflowは一つDAG(Directed Acyclic Graph)
- タスクの集合及び依存関係
- 有向非巡回グラフ, cycleを持たない有向グラフのこと
- タスクの依存関係はトポロジカルソート(topological sort)で解決できる
- タスク間にデータの受け渡しは(‘xcom’ )
- “pull” data from a second task (xcom_pull)
- “push” data from one task (xcom_push)
類似ツール(workflow engine)
- spotify luigi(python)
- treasuredata digdag(java)
- oozie
- タスクの集合及び依存関係
- 有向非巡回グラフ, cycleを持たない有向グラフのこと
- タスクの依存関係はトポロジカルソート(topological sort)で解決できる
- “pull” data from a second task (xcom_pull)
- “push” data from one task (xcom_push)
- spotify luigi(python)
- treasuredata digdag(java)
- oozie
GCP用のプラグインが多いので、airflowを選んだ
Airflow Daemons
- web server
- scheduler
- worker
Single Node Deployment:
Cluster Deployment
- workerがスケールできる
- web serverもスケールできる
- schedulerがスケールできない (single failure point)
- airflow-scheduler-failover-controller(active-standby)のプロジェクトがある
airflowスクリプト(dag)の定義
スクリプト(dag)内部のタスクとタスクの依頼関係を実現したい
タスク=Operator
airflowタスク(dag)の定義
import airflow
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_bash_operator', default_args=args,
schedule_interval='0 0 * * *', # cron style
# 'start_date': datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0)),
dagrun_timeout=timedelta(minutes=60))
cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)
run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', dag=dag)
#依頼関係1: run_this_lastを実行する前にrun_thisが完了したこと
run_this >> run_this_last
for i in range(3):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag)
#依頼関係2: run_thisを実行する前にtask(runme_0, runme_1, rune_2)が完了したこと
task >> run_this
bash_task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
#依頼関係3: run_this_lastを実行する前にbash_taskが完了したこと
bash_task >> run_this_last
if __name__ == "__main__":
dag.cli()
プラグインの書き方と種類
- 基本クラスを継承してクラスを作成
- 種類
- Operator: execute関数に書かれた処理を実行する
- Sensor: poke関数の帰り値がTrueになるまで処理を実行する
- Hook: mysql/bigqueryなど外部システムconnectionのインターフェース
- その他
airflowのコマンド
- パラメーターを渡す
- タスク一覧を正規表現でリストアップ
- テスト
# -tp TASK_PARAMS, TAG01_v1.0: タグ名,
# download_to_bigquery-#{app_id}: タスク名, 2017-08-09: 対象日
airflow test -tp '{"_OVERRIDE_FROM_DATE": "2017-08-01", "_OVERRIDE_TO_DATE": "2017-08-05"}' TAG01_v1.0 download_to_bigquery-#{app_id} 2017-08-09
手動実行
# 初めて過去のデータを処理する(実行順番の注意: 必ず古い順ではない)
airflow backfill -s 2017-05-30T19:00:00 -e 2017-06-31T19:00:00 -I TAG01_v1.0
# 過去のデータを再処理する(実行順番の注意: 必ず古い順ではない)
airflow clear -t 'merge_event.*75019' TAG01_v1.0
注意点
- timezoneはutcなので、日本時間をutcに変更しないといけないです
- 19:00PM UTC yesterday => 04:00 AM JST today
- 昨日の4時に実行したい:datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0))
- start_dateとintervalの混乱
- 最初のタスクが実行されたタイミングはintervalが過ぎた時: start_date + interval
- start_dateかintervalを変更したら、dag_idを変更しましょう
- 再集計時の実行順が選べない
- 一気にタスクがクリアすると、並列実行されるが、でも古い順に実行したい
- 対応:順番に依存しない再集計する別DAGを作成する
apache airflowの利用シーン
- Datawareの事前処理
- レポートティング
- 機械学習(spark,dataflowとの連携)
airflow UI
import airflow
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_bash_operator', default_args=args,
schedule_interval='0 0 * * *', # cron style
# 'start_date': datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0)),
dagrun_timeout=timedelta(minutes=60))
cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)
run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', dag=dag)
#依頼関係1: run_this_lastを実行する前にrun_thisが完了したこと
run_this >> run_this_last
for i in range(3):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag)
#依頼関係2: run_thisを実行する前にtask(runme_0, runme_1, rune_2)が完了したこと
task >> run_this
bash_task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
#依頼関係3: run_this_lastを実行する前にbash_taskが完了したこと
bash_task >> run_this_last
if __name__ == "__main__":
dag.cli()
- 基本クラスを継承してクラスを作成
- 種類
- Operator: execute関数に書かれた処理を実行する
- Sensor: poke関数の帰り値がTrueになるまで処理を実行する
- Hook: mysql/bigqueryなど外部システムconnectionのインターフェース
- その他
airflowのコマンド
- パラメーターを渡す
- タスク一覧を正規表現でリストアップ
- テスト
# -tp TASK_PARAMS, TAG01_v1.0: タグ名,
# download_to_bigquery-#{app_id}: タスク名, 2017-08-09: 対象日
airflow test -tp '{"_OVERRIDE_FROM_DATE": "2017-08-01", "_OVERRIDE_TO_DATE": "2017-08-05"}' TAG01_v1.0 download_to_bigquery-#{app_id} 2017-08-09
手動実行
# 初めて過去のデータを処理する(実行順番の注意: 必ず古い順ではない)
airflow backfill -s 2017-05-30T19:00:00 -e 2017-06-31T19:00:00 -I TAG01_v1.0
# 過去のデータを再処理する(実行順番の注意: 必ず古い順ではない)
airflow clear -t 'merge_event.*75019' TAG01_v1.0
注意点
- timezoneはutcなので、日本時間をutcに変更しないといけないです
- 19:00PM UTC yesterday => 04:00 AM JST today
- 昨日の4時に実行したい:datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0))
- start_dateとintervalの混乱
- 最初のタスクが実行されたタイミングはintervalが過ぎた時: start_date + interval
- start_dateかintervalを変更したら、dag_idを変更しましょう
- 再集計時の実行順が選べない
- 一気にタスクがクリアすると、並列実行されるが、でも古い順に実行したい
- 対応:順番に依存しない再集計する別DAGを作成する
apache airflowの利用シーン
- Datawareの事前処理
- レポートティング
- 機械学習(spark,dataflowとの連携)
airflow UI
# -tp TASK_PARAMS, TAG01_v1.0: タグ名,
# download_to_bigquery-#{app_id}: タスク名, 2017-08-09: 対象日
airflow test -tp '{"_OVERRIDE_FROM_DATE": "2017-08-01", "_OVERRIDE_TO_DATE": "2017-08-05"}' TAG01_v1.0 download_to_bigquery-#{app_id} 2017-08-09
# 初めて過去のデータを処理する(実行順番の注意: 必ず古い順ではない)
airflow backfill -s 2017-05-30T19:00:00 -e 2017-06-31T19:00:00 -I TAG01_v1.0
# 過去のデータを再処理する(実行順番の注意: 必ず古い順ではない)
airflow clear -t 'merge_event.*75019' TAG01_v1.0
注意点
- timezoneはutcなので、日本時間をutcに変更しないといけないです
- 19:00PM UTC yesterday => 04:00 AM JST today
- 昨日の4時に実行したい:datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0))
- start_dateとintervalの混乱
- 最初のタスクが実行されたタイミングはintervalが過ぎた時: start_date + interval
- start_dateかintervalを変更したら、dag_idを変更しましょう
- 再集計時の実行順が選べない
- 一気にタスクがクリアすると、並列実行されるが、でも古い順に実行したい
- 対応:順番に依存しない再集計する別DAGを作成する
apache airflowの利用シーン
- Datawareの事前処理
- レポートティング
- 機械学習(spark,dataflowとの連携)
airflow UI
- 19:00PM UTC yesterday => 04:00 AM JST today
- 昨日の4時に実行したい:datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0))
- 最初のタスクが実行されたタイミングはintervalが過ぎた時: start_date + interval
- start_dateかintervalを変更したら、dag_idを変更しましょう
- 一気にタスクがクリアすると、並列実行されるが、でも古い順に実行したい
- 対応:順番に依存しない再集計する別DAGを作成する
- Datawareの事前処理
- レポートティング
- 機械学習(spark,dataflowとの連携)
airflow UI
Tree View
過去タスクの一覧表示される
過去分の一括クリアなどが出来て便利(コマンドの方が便利)
ガントチャート(Gantt)
タスクごとの実行時間が見れて時間のかかっているタスクがわかる
チューニングなどに使う
その他
- kubernetesでリリース
- 失敗したら、slackに通知する
その他
cd kube
make list-pods
kubectl get po -a --namespace airflow-prod
NAME READY STATUS RESTARTS AGE
flower-616933508-5kbxc 1/1 Running 0 17d
postgres-727646253-pxr8b 1/1 Running 0 39d
rabbitmq-1570300244-h251r 1/1 Running 0 39d
scheduler-2020340155-1nplv 1/1 Running 179 16d
web-1384073748-qj44c 1/1 Running 0 16d
worker-1579982757-tvzg3 1/1 Running 0 16d
make list-services
kubectl get svc -a --namespace airflow-prod
NAME CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flower 10.23.XXX.227 <nodes> 5555:32081/TCP 39d
postgres 10.23.XXX.130 <none> 5432/TCP 39d
rabbitmq 10.23.XXX.203 <none> 5672/TCP,15672/TCP 39d
web 10.23.XXX.195 XXX.193.207.XXX 8080:32080/TCP 39d
worker-subdomain None <none> 8793/TCP 39d
cd kube
make list-pods
kubectl get po -a --namespace airflow-prod
NAME READY STATUS RESTARTS AGE
flower-616933508-5kbxc 1/1 Running 0 17d
postgres-727646253-pxr8b 1/1 Running 0 39d
rabbitmq-1570300244-h251r 1/1 Running 0 39d
scheduler-2020340155-1nplv 1/1 Running 179 16d
web-1384073748-qj44c 1/1 Running 0 16d
worker-1579982757-tvzg3 1/1 Running 0 16d
make list-services
kubectl get svc -a --namespace airflow-prod
NAME CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flower 10.23.XXX.227 <nodes> 5555:32081/TCP 39d
postgres 10.23.XXX.130 <none> 5432/TCP 39d
rabbitmq 10.23.XXX.203 <none> 5672/TCP,15672/TCP 39d
web 10.23.XXX.195 XXX.193.207.XXX 8080:32080/TCP 39d
worker-subdomain None <none> 8793/TCP 39d
Author And Source
この問題について(ワークフローairflowの紹介), 我々は、より多くの情報をここで見つけました https://qiita.com/hitxiang/items/468771e60b46a23d75be著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .