ワークフローairflowの紹介


apache airflowとは

  • https://github.com/apache/incubator-airflow
  • pythonでタスクとタスク依存関係を書く
  • プラグインによる拡張が可能
  • ワークフロー管理のためのUI
    • ワークフローの可視化
    • 実行の監視、ログの確認(原因究明が速い)
    • 必要なタスクの再実行

apache airflowの特徴

  • pythonコードなので、ソースはgitで管理できる
  • 処理を実行するworker(Scaling Out with Celery)の分散が可能
  • 色々プラグインが用意されている
  • pythonによるプラグイン拡張ができる

apache airflowの特徴

  • 一つworkflowは一つDAG(Directed Acyclic Graph)
    • タスクの集合及び依存関係
    • 有向非巡回グラフ, cycleを持たない有向グラフのこと
    • タスクの依存関係はトポロジカルソート(topological sort)で解決できる
  • タスク間にデータの受け渡しは(‘xcom’ )
    • “pull” data from a second task (xcom_pull)
    • “push” data from one task (xcom_push)

類似ツール(workflow engine)

  • spotify luigi(python)
  • treasuredata digdag(java)
  • oozie

GCP用のプラグインが多いので、airflowを選んだ


Airflow Daemons

  • web server
  • scheduler
  • worker

Single Node Deployment:


Cluster Deployment

  • workerがスケールできる
  • web serverもスケールできる
  • schedulerがスケールできない (single failure point)
    • airflow-scheduler-failover-controller(active-standby)のプロジェクトがある

airflowスクリプト(dag)の定義

スクリプト(dag)内部のタスクとタスクの依頼関係を実現したい

タスク=Operator


airflowタスク(dag)の定義

import airflow
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import timedelta


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    schedule_interval='0 0 * * *',  # cron style
   # 'start_date': datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0)),
    dagrun_timeout=timedelta(minutes=60))

cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)

run_this = BashOperator(
    task_id='run_after_loop', bash_command='echo 1', dag=dag)
#依頼関係1: run_this_lastを実行する前にrun_thisが完了したこと
run_this >> run_this_last

for i in range(3):
    i = str(i)
    task = BashOperator(
        task_id='runme_'+i,
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        dag=dag)
    #依頼関係2: run_thisを実行する前にtask(runme_0, runme_1, rune_2)が完了したこと
    task >> run_this

bash_task = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag)
#依頼関係3: run_this_lastを実行する前にbash_taskが完了したこと
bash_task >> run_this_last

if __name__ == "__main__":
    dag.cli()

プラグインの書き方と種類

  • 基本クラスを継承してクラスを作成
  • 種類
    • Operator: execute関数に書かれた処理を実行する
    • Sensor: poke関数の帰り値がTrueになるまで処理を実行する
    • Hook: mysql/bigqueryなど外部システムconnectionのインターフェース
    • その他

airflowのコマンド

  • パラメーターを渡す
  • タスク一覧を正規表現でリストアップ
  • テスト
# -tp TASK_PARAMS, TAG01_v1.0: タグ名, 
# download_to_bigquery-#{app_id}: タスク名, 2017-08-09: 対象日
airflow test -tp '{"_OVERRIDE_FROM_DATE": "2017-08-01", "_OVERRIDE_TO_DATE": "2017-08-05"}' TAG01_v1.0 download_to_bigquery-#{app_id} 2017-08-09

手動実行

# 初めて過去のデータを処理する(実行順番の注意: 必ず古い順ではない)
airflow backfill -s 2017-05-30T19:00:00 -e 2017-06-31T19:00:00 -I TAG01_v1.0

# 過去のデータを再処理する(実行順番の注意: 必ず古い順ではない)
airflow clear -t 'merge_event.*75019' TAG01_v1.0

注意点

  • timezoneはutcなので、日本時間をutcに変更しないといけないです
    • 19:00PM UTC yesterday => 04:00 AM JST today
    • 昨日の4時に実行したい:datetime.combine(datetime.today() - timedelta(2), time(19, 0, 0))
  • start_dateとintervalの混乱
    • 最初のタスクが実行されたタイミングはintervalが過ぎた時: start_date + interval
    • start_dateかintervalを変更したら、dag_idを変更しましょう
  • 再集計時の実行順が選べない
    • 一気にタスクがクリアすると、並列実行されるが、でも古い順に実行したい
    • 対応:順番に依存しない再集計する別DAGを作成する

apache airflowの利用シーン

  • Datawareの事前処理
  • レポートティング
  • 機械学習(spark,dataflowとの連携)

airflow UI

Tree View
  過去タスクの一覧表示される
  過去分の一括クリアなどが出来て便利(コマンドの方が便利)
ガントチャート(Gantt)
  タスクごとの実行時間が見れて時間のかかっているタスクがわかる
  チューニングなどに使う


その他


その他

cd kube
make list-pods
kubectl get po -a --namespace airflow-prod

NAME                         READY     STATUS    RESTARTS   AGE
flower-616933508-5kbxc       1/1       Running   0          17d
postgres-727646253-pxr8b     1/1       Running   0          39d
rabbitmq-1570300244-h251r    1/1       Running   0          39d
scheduler-2020340155-1nplv   1/1       Running   179        16d
web-1384073748-qj44c         1/1       Running   0          16d
worker-1579982757-tvzg3      1/1       Running   0          16d

make list-services
kubectl get svc -a --namespace airflow-prod

NAME               CLUSTER-IP      EXTERNAL-IP      PORT(S)              AGE
flower             10.23.XXX.227   <nodes>          5555:32081/TCP       39d
postgres           10.23.XXX.130   <none>           5432/TCP             39d
rabbitmq           10.23.XXX.203   <none>           5672/TCP,15672/TCP   39d
web                10.23.XXX.195   XXX.193.207.XXX   8080:32080/TCP       39d
worker-subdomain   None            <none>           8793/TCP             39d