Airflow入門チュートリアル&例
8719 ワード
目次インポートモジュール デフォルトパラメータ を設定 DAG をインスタンス化タスク Templating with Jinja 依存関係 を設定する.以上の内容 を簡単に再記述する.テスト
これらのデフォルトパラメータは、タスクを作成するときに使用できます.
タスクを埋め込むにはDAGオブジェクトが必要です.次のコードでは、まずDAGの一意の識別として文字列を定義し、デフォルトのパラメータ辞書(上に定義されている)に入力し、スケジュールを1日間隔で定義します.
operatorをインスタンス化すると、タスクが生成されます.operatorからインスタンス化されたオブジェクトはコンストラクタとも呼ばれ、最初のパラメータ
各Operator固有のパラメータ(bash_command)およびBaseOperatorから継承されたすべてのOperatorの共通パラメータ(retries)をOperatorのconstructorに渡す方法に注意してください.これは、各パラメータを各constructorに渡すよりも簡単です.もちろん,t 2が継承する汎用パラメータretriesは我々に再ロードされ,3に付与されることにも気づいた.タスクの前提条件は次のとおりです.明確な伝達パラメータ 値default_Args辞書には が存在する operatorのデフォルト値(存在する場合)タスクにパラメータtaskを含めるか継承する必要があります.idとowner、そうでなければAirflowは異常 を投げ出す
AirflowはJinja Templatingの強力な機能を利用して、パイプの作者に内蔵パラメータとマクロのセットを提供します.Airflowはまた,パイプ作成者に独自のパラメータ,マクロ,テンプレートを定義するフック(Hooks)を提供した.このチュートリアルでは、Airflowでテンプレートを使用して操作することはほとんどありません.このセクションの目的は、この機能の存在を理解し、カッコと最も一般的なテンプレート変数:{{ds}}(今日の日付スタンプ)を熟知させることです.
注意してください、templated_commandは、
BaseOperatorのparams hookでは、パラメータおよび/またはオブジェクトの辞書をテンプレートに渡すことができます.パラメータを理解するのに少し時間がかかりますmy_paramはどのようにテンプレートを通過しますか.
ファイルは、パイプファイルを含むディレクトリ(この例ではtutorial.py)に対してファイルの位置が相対的である
同じDAGコンストラクション関数呼び出しを使用すると、
テンプレートで参照できる変数とマクロの詳細については、マクロリファレンスを参照してください.
互いに依存しない3つのタスクt 1,t 2,t 3がある.次に、それらの依存関係を定義する方法があります.
スクリプトを実行すると、AirflowはDAGでループまたは複数の参照依存項目を見つけたときに例外を引き起こすことに注意してください.
私たちはすでに非常に基礎的なDAGを持っています.あなたのコードは次のように見えます.
いくつかのテストを行う時です.まずパイプの解析に成功することを確認します.まず、上記のコードが
このスクリプトが異常を投げ出さない場合は、恐ろしい間違いを犯していないことを意味し、Airflow環境は悪くありません.
前のスクリプトをさらに検証するためのコマンドを実行しましょう
テストは、特定の日付で実際のタスクインスタンスを実行することによって行います.このコンテキストで指定された日付は
私たちが前にテンプレートでやったことを覚えていますか?このコマンドを実行して、このテンプレートの表示と実行方法について説明します.
詳細なイベント・ログが表示され、最終的にbashコマンドが実行され、結果が印刷されるはずです.
すべてがうまく動いているように見えます.backfillを実行しましょう.backfillは依存関係に従い、ログをファイルに送信し、データベースと通信してステータスを記録します.ネットワーク・サーバがある場合は、進捗状況を追跡することもできます.backfill中に進捗状況を直感的に追跡することに興味がある場合は、
backfill:指定した日付範囲でDAGのサブセクションを実行します.
このコンテキストの日付範囲はstart_です.dateとオプションend_dateは、このdagのタスクインスタンスを使用して実行計画を埋め込むために使用されます.
転載先:https://www.cnblogs.com/wanglvtao/p/10826663.html
モジュールのインポート
#
# DAG DAG , ,
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
デフォルトパラメータの設定
これらのデフォルトパラメータは、タスクを作成するときに使用できます.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
DAGをインスタンス化
タスクを埋め込むにはDAGオブジェクトが必要です.次のコードでは、まずDAGの一意の識別として文字列を定義し、デフォルトのパラメータ辞書(上に定義されている)に入力し、スケジュールを1日間隔で定義します.
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
タスク#タスク#
operatorをインスタンス化すると、タスクが生成されます.operatorからインスタンス化されたオブジェクトはコンストラクタとも呼ばれ、最初のパラメータ
task_id
はタスクの一意の識別として機能します.t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
各Operator固有のパラメータ(bash_command)およびBaseOperatorから継承されたすべてのOperatorの共通パラメータ(retries)をOperatorのconstructorに渡す方法に注意してください.これは、各パラメータを各constructorに渡すよりも簡単です.もちろん,t 2が継承する汎用パラメータretriesは我々に再ロードされ,3に付与されることにも気づいた.タスクの前提条件は次のとおりです.
Templating with Jinja
AirflowはJinja Templatingの強力な機能を利用して、パイプの作者に内蔵パラメータとマクロのセットを提供します.Airflowはまた,パイプ作成者に独自のパラメータ,マクロ,テンプレートを定義するフック(Hooks)を提供した.このチュートリアルでは、Airflowでテンプレートを使用して操作することはほとんどありません.このセクションの目的は、この機能の存在を理解し、カッコと最も一般的なテンプレート変数:{{ds}}(今日の日付スタンプ)を熟知させることです.
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
注意してください、templated_commandは、
{%%}
ブロックのコードロジックを含み、{{ds}}
のようなパラメータを参照し、{{macros.ds_add(ds,7)}
}の関数を呼び出し、{{params.my_param}}
でユーザ定義パラメータを参照する.BaseOperatorのparams hookでは、パラメータおよび/またはオブジェクトの辞書をテンプレートに渡すことができます.パラメータを理解するのに少し時間がかかりますmy_paramはどのようにテンプレートを通過しますか.
ファイルは、パイプファイルを含むディレクトリ(この例ではtutorial.py)に対してファイルの位置が相対的である
bash_command
パラメータ、例えばbash_command ='templated_command.sh'
にも渡すことができる.これには、スクリプトの論理コードとパイプコードを分離したり、異なる言語で作成されたファイルで正しいコード強調表示を実行したり、パイプを構築する汎用性と柔軟性など、多くの理由があります.template_searchpath
は、DAGコンストラクタ呼び出し内の任意のフォルダの場所を指すように定義することもできる.同じDAGコンストラクション関数呼び出しを使用すると、
user_defined_macros
を定義できます.これにより、独自の変数を指定できます.たとえば、dict(foo ='bar')
をこのパラメータに渡すと、テンプレートで{{foo}}
を使用できます.さらに、user_defined_filters
を指定すると、独自のフィルタを登録できます.たとえば、dict(hello = lambda name:'Hello%s'%name)
をこのパラメータに渡すと、カスタムフィルタの詳細については、独自のテンプレートで{{'world'|{{ 'world' | hello }}
を使用できます.Jinjaドキュメントを参照してください.テンプレートで参照できる変数とマクロの詳細については、マクロリファレンスを参照してください.
依存関係の設定
互いに依存しない3つのタスクt 1,t 2,t 3がある.次に、それらの依存関係を定義する方法があります.
t1.set_downstream(t2)
# t2 t1
#
t2.set_upstream(t1)
# t2 t1
t1 >> t2
# t1 t2
t2 << t1
#
t1 >> t2 >> t3
# ,
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
スクリプトを実行すると、AirflowはDAGでループまたは複数の参照依存項目を見つけたときに例外を引き起こすことに注意してください.
以上を簡単に再記述する
私たちはすでに非常に基礎的なDAGを持っています.あなたのコードは次のように見えます.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
テスト
スクリプトの実行
いくつかのテストを行う時です.まずパイプの解析に成功することを確認します.まず、上記のコードが
tutorial.py
に格納されていることを確認します.ファイルの場所はairflow.cfg
で指定されたdags
フォルダ内にあり、DAGsフォルダのデフォルトは~/airflow/dags
でコマンドラインで実行されます.python ~/airflow/dags/tutorial.py
このスクリプトが異常を投げ出さない場合は、恐ろしい間違いを犯していないことを意味し、Airflow環境は悪くありません.
コマンドラインメタデータ検証
前のスクリプトをさらに検証するためのコマンドを実行しましょう
# DAGs
airflow list_dags
# dag_id "tutorial"
airflow list_tasks tutorial
# tutorial DAG
airflow list_tasks tutorial --tree
テスト
テストは、特定の日付で実際のタスクインスタンスを実行することによって行います.このコンテキストで指定された日付は
execution_date
で、特定の日付+時間スケジューリング実行タスクまたはdagをシミュレートします.# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
私たちが前にテンプレートでやったことを覚えていますか?このコマンドを実行して、このテンプレートの表示と実行方法について説明します.
# testing templated
airflow test tutorial templated 2015-06-01
詳細なイベント・ログが表示され、最終的にbashコマンドが実行され、結果が印刷されるはずです.
airflow test
コマンドは、ローカルでタスク・インスタンスを実行し、そのログをstdout(画面上)に出力し、依存項目に依存せず、データベースにステータスを伝えません(実行、成功、失敗、...).単一のタスクインスタンスのみをテストできます.backfill
すべてがうまく動いているように見えます.backfillを実行しましょう.backfillは依存関係に従い、ログをファイルに送信し、データベースと通信してステータスを記録します.ネットワーク・サーバがある場合は、進捗状況を追跡することもできます.backfill中に進捗状況を直感的に追跡することに興味がある場合は、
airflow webserver
がWebサーバを起動します.depends_on_past = True
を使用する場合、単一のタスクインスタンスは、前のタスクインスタンスの成功に依存することに注意してください.でもこのタスクのstart_を指定したらdate、この依存関係は無視されます.backfill:指定した日付範囲でDAGのサブセクションを実行します.
reset_dag_run
オプションが使用される場合、backfill
は、airflow
の日付範囲内のすべての以前のbackfill
およびdag_run
を消去すべきかどうかを最初にユーザに提示する.task_instances
が使用される場合、rerun_failed_tasks
は、backfill
の日付範囲内の以前に失敗したタスクインスタンスを自動的に再実行します.cmd airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] [-m] [-l] [-x] [-i] [-I] [-sd SUBDIR] [--pool POOL] [--delay_on_limit DELAY_ON_LIMIT] [-dr] [-v] [-c CONF] [--reset_dagruns] [--rerun_failed_tasks] [-B] dag_id ``` このコンテキストの日付範囲はstart_です.dateとオプションend_dateは、このdagのタスクインスタンスを使用して実行計画を埋め込むために使用されます.
# , web
# airflow webserver --debug &
# backfill
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
転載先:https://www.cnblogs.com/wanglvtao/p/10826663.html