Airflow入門チュートリアル&例

8719 ワード

目次
  • インポートモジュール
  • デフォルトパラメータ
  • を設定
  • DAG
  • をインスタンス化
  • タスク
  • Templating with Jinja
  • 依存関係
  • を設定する.
  • 以上の内容
  • を簡単に再記述する.
  • テスト
  • モジュールのインポート

    #  
    # DAG DAG , , 
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta

    デフォルトパラメータの設定


    これらのデフォルトパラメータは、タスクを作成するときに使用できます.
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }

    DAGをインスタンス化


    タスクを埋め込むにはDAGオブジェクトが必要です.次のコードでは、まずDAGの一意の識別として文字列を定義し、デフォルトのパラメータ辞書(上に定義されている)に入力し、スケジュールを1日間隔で定義します.
    dag = DAG(
        'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

    タスク#タスク#


    operatorをインスタンス化すると、タスクが生成されます.operatorからインスタンス化されたオブジェクトはコンストラクタとも呼ばれ、最初のパラメータtask_idはタスクの一意の識別として機能します.
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)

    各Operator固有のパラメータ(bash_command)およびBaseOperatorから継承されたすべてのOperatorの共通パラメータ(retries)をOperatorのconstructorに渡す方法に注意してください.これは、各パラメータを各constructorに渡すよりも簡単です.もちろん,t 2が継承する汎用パラメータretriesは我々に再ロードされ,3に付与されることにも気づいた.タスクの前提条件は次のとおりです.
  • 明確な伝達パラメータ
  • 値default_Args辞書には
  • が存在する
  • operatorのデフォルト値(存在する場合)タスクにパラメータtaskを含めるか継承する必要があります.idとowner、そうでなければAirflowは異常
  • を投げ出す

    Templating with Jinja


    AirflowはJinja Templatingの強力な機能を利用して、パイプの作者に内蔵パラメータとマクロのセットを提供します.Airflowはまた,パイプ作成者に独自のパラメータ,マクロ,テンプレートを定義するフック(Hooks)を提供した.このチュートリアルでは、Airflowでテンプレートを使用して操作することはほとんどありません.このセクションの目的は、この機能の存在を理解し、カッコと最も一般的なテンプレート変数:{{ds}}(今日の日付スタンプ)を熟知させることです.
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7) }}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)

    注意してください、templated_commandは、{%%}ブロックのコードロジックを含み、{{ds}}のようなパラメータを参照し、{{macros.ds_add(ds,7)}}の関数を呼び出し、{{params.my_param}}でユーザ定義パラメータを参照する.
    BaseOperatorのparams hookでは、パラメータおよび/またはオブジェクトの辞書をテンプレートに渡すことができます.パラメータを理解するのに少し時間がかかりますmy_paramはどのようにテンプレートを通過しますか.
    ファイルは、パイプファイルを含むディレクトリ(この例ではtutorial.py)に対してファイルの位置が相対的であるbash_commandパラメータ、例えばbash_command ='templated_command.sh'にも渡すことができる.これには、スクリプトの論理コードとパイプコードを分離したり、異なる言語で作成されたファイルで正しいコード強調表示を実行したり、パイプを構築する汎用性と柔軟性など、多くの理由があります.template_searchpathは、DAGコンストラクタ呼び出し内の任意のフォルダの場所を指すように定義することもできる.
    同じDAGコンストラクション関数呼び出しを使用すると、user_defined_macrosを定義できます.これにより、独自の変数を指定できます.たとえば、dict(foo ='bar')をこのパラメータに渡すと、テンプレートで{{foo}}を使用できます.さらに、user_defined_filtersを指定すると、独自のフィルタを登録できます.たとえば、dict(hello = lambda name:'Hello%s'%name)をこのパラメータに渡すと、カスタムフィルタの詳細については、独自のテンプレートで{{'world'|{{ 'world' | hello }}を使用できます.Jinjaドキュメントを参照してください.
    テンプレートで参照できる変数とマクロの詳細については、マクロリファレンスを参照してください.

    依存関係の設定


    互いに依存しない3つのタスクt 1,t 2,t 3がある.次に、それらの依存関係を定義する方法があります.
    t1.set_downstream(t2)
    
    #  t2 t1
    #  
    t2.set_upstream(t1)
    
    #   t2 t1  
    t1 >> t2
    
    #   t1 t2  
    t2 << t1
    
    #  
    t1 >> t2 >> t3
    
    #  , 
    t1.set_downstream([t2, t3])
    t1 >> [t2, t3]
    [t2, t3] << t1

    スクリプトを実行すると、AirflowはDAGでループまたは複数の参照依存項目を見つけたときに例外を引き起こすことに注意してください.

    以上を簡単に再記述する


    私たちはすでに非常に基礎的なDAGを持っています.あなたのコードは次のように見えます.
    """
    Code that goes along with the Airflow tutorial located at:
    https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG(
        'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)

    テスト


    スクリプトの実行


    いくつかのテストを行う時です.まずパイプの解析に成功することを確認します.まず、上記のコードがtutorial.pyに格納されていることを確認します.ファイルの場所はairflow.cfgで指定されたdagsフォルダ内にあり、DAGsフォルダのデフォルトは~/airflow/dagsでコマンドラインで実行されます.
    python ~/airflow/dags/tutorial.py

    このスクリプトが異常を投げ出さない場合は、恐ろしい間違いを犯していないことを意味し、Airflow環境は悪くありません.

    コマンドラインメタデータ検証


    前のスクリプトをさらに検証するためのコマンドを実行しましょう
    #  DAGs  
    airflow list_dags
    
    #  dag_id   "tutorial"   
    airflow list_tasks tutorial
    
    #    tutorial DAG 
    airflow list_tasks tutorial --tree

    テスト


    テストは、特定の日付で実際のタスクインスタンスを実行することによって行います.このコンテキストで指定された日付はexecution_dateで、特定の日付+時間スケジューリング実行タスクまたはdagをシミュレートします.
    # command layout: command subcommand dag_id task_id date
    
    # testing print_date
    airflow test tutorial print_date 2015-06-01
    
    # testing sleep
    airflow test tutorial sleep 2015-06-01

    私たちが前にテンプレートでやったことを覚えていますか?このコマンドを実行して、このテンプレートの表示と実行方法について説明します.
    # testing templated
    airflow test tutorial templated 2015-06-01

    詳細なイベント・ログが表示され、最終的にbashコマンドが実行され、結果が印刷されるはずです.airflow testコマンドは、ローカルでタスク・インスタンスを実行し、そのログをstdout(画面上)に出力し、依存項目に依存せず、データベースにステータスを伝えません(実行、成功、失敗、...).単一のタスクインスタンスのみをテストできます.

    backfill


    すべてがうまく動いているように見えます.backfillを実行しましょう.backfillは依存関係に従い、ログをファイルに送信し、データベースと通信してステータスを記録します.ネットワーク・サーバがある場合は、進捗状況を追跡することもできます.backfill中に進捗状況を直感的に追跡することに興味がある場合は、airflow webserverがWebサーバを起動します.depends_on_past = Trueを使用する場合、単一のタスクインスタンスは、前のタスクインスタンスの成功に依存することに注意してください.でもこのタスクのstart_を指定したらdate、この依存関係は無視されます.
    backfill:指定した日付範囲でDAGのサブセクションを実行します.reset_dag_runオプションが使用される場合、backfillは、airflowの日付範囲内のすべての以前のbackfillおよびdag_runを消去すべきかどうかを最初にユーザに提示する.task_instancesが使用される場合、rerun_failed_tasksは、backfillの日付範囲内の以前に失敗したタスクインスタンスを自動的に再実行します.cmd airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] [-m] [-l] [-x] [-i] [-I] [-sd SUBDIR] [--pool POOL] [--delay_on_limit DELAY_ON_LIMIT] [-dr] [-v] [-c CONF] [--reset_dagruns] [--rerun_failed_tasks] [-B] dag_id ```
    このコンテキストの日付範囲はstart_です.dateとオプションend_dateは、このdagのタスクインスタンスを使用して実行計画を埋め込むために使用されます.
    #  , web 
    # airflow webserver --debug &
    
    #   backfill
    airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

    転載先:https://www.cnblogs.com/wanglvtao/p/10826663.html