AWSでのApache気流の理解

13918 ワード

アパッチ気流はクールな名前だけではありませんまた、AWS上のApache Airflow(MWAA)の管理ワークフローとして使用できる強力なワークフローオーケストレーションツールです.このポストはどのサービスが解決するか、どのように、あなたが理解する必要がある最も重要な概念を始めることができるかを説明します.最後に、例のユースケースと一緒にすべてをもたらすでしょう.
アパッチエコシステムfull of projects , そしてしばしば、プロジェクトの名前は、ツールが何を行うかを示しません(豚、カサンドラ、ハイブなど).気流はそれのもう一つの例です.あなたが最初に名前に来るとき、サービスがするもののあなたの頭で、あなたは明確な考えを持ちません.あなたが我々に類似していると思うならば、名前がポップアップし続けるまで、あなたは最初にこれらのプロジェクトを無視します.気流は我々のレーダーで現れ続けました、そして、最初のプロジェクトがやって来たとき、我々はそれの中に深く働きました.我々は、ツールはかなりクールであることを報告することができますので、我々はそれにいくつかの洞察を与えたい理由です.
AWSの生態系に精通している場合は、ステップ機能と接着剤のワークフローのミックスとして気流を考えることができます.その主な課題は、ETLのプロセスを編成することです.頭字語は迷惑ですが、ここでETLを使い続けます.変換負荷を抽出し,何らかの操作や濃縮による分析のためのデータを準備することを指す.標準工具はGlue , スパーク、弾性マップを減らす(EMR)、ラムダ、またはアテネ.ETLプロセスは、時間とともに複雑さが増加する傾向があります、そして、あなたはあなたがデータを処理するために互いに連携して異なるサービスを予定して、調整する必要があるとわかります.ここでは気流が助けることができるところです.
AirFlowは、あなたのETLワークフローをPythonコードとして記述することができる人気のオープンソースツールです.また、すべてのETLプロセスを監視する単一のインターフェイスを提供します.
AWSでは、Apache AirflowのためのMWAAAまたは管理されたワークフローは、管理された気流環境を提供します.記憶に残る名前ではないが、それは記述とよく構築されたサービスです.サービスを使用し始めるには、少なくとも2つのプライベートサブネットを持つVPCが必要です.VPCはアカウントに所有されなければなりません.さらに、我々の環境が他のサービスをトリガーするために使用できるロールを作成する必要があります.この環境は、ETLワークフローをホストする責任があります.ロールには、ワークフローで使用するすべてのサービスへのアクセスが必要です.それはセキュリティの観点から大きくないですが、我々が今これについてできることはあまりありません.戦略は、状況を改善するために将来のブログ記事のトピックになる可能性があります-あなたが興味を持っている場合教えてください.
VPCと役割があれば、デフォルトで非常に利用可能な環境を作成できます.ワークフロー定義(Dags,それ以降の詳細)を格納するS 3パスを指定する必要があります.環境が使用できる役割は別の必須の構成です.必要に応じてS 3のパスを設定することができます.インストールするパッケージとプラグインへのパスを指定するtxt.また、他の依存関係を含めることができます.また、環境で何が起こっているかについてより多くの洞察を得るために、ここでログレベルを増やすことはよいです.
それが25と30分の間を取ることができるので、今、あなたは環境がつくられる間、あなたは忍耐を必要とします、そして、後でそれを更新するならば、それはおよそ15分かかるかもしれません.あなたはあまりにも多くの回これをめちゃくちゃ避けるためにしたいと思います.環境を待ってスピンまたは再構成する楽しい経験ではありません.一旦それが準備ができている状態であるならば、我々は我々がログインするのを許す空想的な環境を持っています.たった今、これは我々のAWSビルを増やす他に何もしません.ワークフローを追加することで変更しましょう.そのためには少しの理論をカバーしなければならない.
導かれた非環式グラフ(Dags)は気流におけるワークフローを記述します.DAGはPythonで書かれ、環境設定時に設定された場所にS 3バケットに保存されます.mwaaは定期的にパスからすべてのダグを取り出し、環境に追加する.しかし、これは通常数秒の問題です.
dagsは、いつ実行されるかを定義するセットアップコードのビットから始めます.その後、ワークフローを構成するタスクのセットです.タスクには2つの種類があります.

  • Sensors 特定の条件までのDAGのブロック実行を実現します.たとえば、外部エンドポイントが特定のステータスコードを返すまで待つことができます.このことを考えるとawait 部分的にasync ... await 非同期プロセスに慣れているパターンです.

  • Operators ぼんやりしている.外部サービスをトリガーし、実行を管理します.たとえば、多くの事前に構築された演算子を使用して、Athenaでクエリを実行するか、シェルスクリプトまたはPythonコードに基づいて独自に作成できます.
  • 各タスクは、サービスに話をする接続を使用します.Connections サービスに接続する方法を定義する型と資格情報のセットで構成されます.演算子やセンサーは、特に設定されていない場合は、デフォルトの接続を使用します.この例では、環境変数を設定するときに設定されたロールを使用します.MWAA環境では、環境をスピンするとき、すでに多くの接続があります.
    タスクを定義した後、実行順序を定義する必要があります.これを使用して、それらの間の依存関係を確立し、できるだけ並列に実行します.あなたはbitshift operators ( >> ) Pythonでは、それを行うのきちんとした方法ですあなたのタスクをチェーンします.次に、簡単なDAGの例を見ます.
    以下は、アテナと対話する簡単なDAGの定義を見ることができます.最初にテーブルを削除するクエリを実行し、そのクエリがセンサーを使用して終了するまで待機します.次に、別のクエリを実行して新しいテーブルを作成し、完了するまで待機します.また、このDAGが予定されているときに定義します.この場合、手動で一度だけ実行します.
    # The DAG object; we'll need this to instantiate a DAG
    from airflow import DAG
    
    # Athena Operators and Sensors, come preinstalled in MWAA
    from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
    from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
    
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    import os
    
    # Naming the DAG the same as the filename
    DAG_ID = os.path.basename(__file__).replace(".py", "")
    
    # AWS variables
    S3_OUTPUT_BUCKET = "my-athena-bucket"
    ATHENA_TABLE_NAME = "athena_example"
    ATHENA_DATABASE = 'default'
    
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    DEFAULT_ARGS = {
        'owner': 'airflow',
        'depends_on_past': False,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
    }
    
    
    # Some Athena SQL Statements, ideally shouldn't be here
    QUERY_DROP_TABLE = f'DROP TABLE IF EXISTS {ATHENA_TABLE_NAME};'
    
    QUERY_CREATE_TABLE = """
    CREATE EXTERNAL TABLE IF NOT EXISTS athena_example (
          .../* Long statement here */
    """
    
    
    with DAG(
        dag_id=DAG_ID,
        default_args=DEFAULT_ARGS,
        dagrun_timeout=timedelta(hours=2),
        start_date=days_ago(1),
        schedule_interval='@once',
        tags=['athena'],
    ) as dag:
    
        drop_table = AWSAthenaOperator(
            task_id='query__drop_table',
            query=QUERY_DROP_TABLE,
            database=ATHENA_DATABASE,
            output_location=f's3://{S3_OUTPUT_BUCKET}/',
            sleep_time=30,
            max_tries=None,
        )
    
        get_drop_state = AthenaSensor(
            task_id='query__get_drop_state',
            query_execution_id=drop_table.output,
            max_retries=None,
            sleep_time=10,
        )
    
        create_table = AWSAthenaOperator(
            task_id='query__create_table',
            query=QUERY_CREATE_TABLE,
            database=ATHENA_DATABASE,
            output_location=f's3://{S3_OUTPUT_BUCKET}/',
            sleep_time=30,
            max_tries=None,
        )
    
        get_create_state = AthenaSensor(
            task_id='query__get_read_state',
            query_execution_id=create_table.output,
            max_retries=None,
            sleep_time=10,
        )
    
        drop_table >> get_drop_state >> create_table >> get_create_state
    
    一旦我々が以前に設定したS 3経路にこのDAGをアップロードするならば、環境はそれを拾います.次に、有効にする必要があります.


    ここでは、AWSサービスと相互作用する演算子の例をいくつか見ました.それはAWSの多くの利用可能な演算子とセンサーのうちの1つだけですread more about here . AWSはまた、例として使用例の完全なgithubリポジトリを提供しますavailable here . これは、空気流の概念についての詳細を学びたい場合は、気流とそのユースケースに深く飛び込む良い出発点である必要がありますthe documentation is here to help you .
    我々が要約する前に、多分、気流を使わないとき、話をしましょう.あなたがする必要があるすべてがAWSで完全に動く10未満のワークフローを調整するならば、気流は高価な解決であるかもしれません.気流の実行starts at around $月35 - 40、それは最小のバージョンです.あなたはそのようなお金のために多くのステップ関数を実行することができますが、同様にそれらを構築する必要がありますし、複雑さに応じて、数学が異なる場合があります.エアフローは、しかし、特に場合は、サードパーティ製のプロバイダで動作する必要があるため、それを行くために多くの持っているのでmany pre-built solutions .

    概要


    Apache Airflowを解決し、AWAYでMWAAAを使用してエアフロー環境を作成する方法を扱った.また、DAGと関連するコンポーネントの基本を説明し、簡単なDAG例を示しました.
    うまくいけば、あなたはこのポストから何かを学びました、そして、我々はあなたの質問、フィードバックと懸念を楽しみにしています.お気軽に私たちに私たちのBIOSに記載されてプロファイルを介して手を差し伸べる.
    Peter & モリス