アパッチ気流.複雑なワークフローを簡単な仕事として作る方法



イントロ
数週間前、私は、このプラットフォームで機能要求に関して働き始めました.特徴はgcp,観測,処理されるデータ数に関係していた.私は、流れを簡単できれいにするために本当に強力な何かを探していて、仕事をつくって、走らせました.そして、整合性、耐故障性、および正しいエラー処理も忘れてはいけません.
私の研究は、特にアパッチ空気流について、オーケストラの話題に私をもたらします.

なぜあなたはオーケストレーションが必要ですか?
あなたがExcelにいくつかのデータをエクスポートするだけで簡単なタスクを持っている場合、多分、あなたはすべてでオーケストレーションを使用する必要はありません.しかし、もしあなたが処理後に本当に良い利益をもたらすデータを操作している場合は、主に大量の非クリーンなデータを毎日働いて-あなたはそれについて考え始めるために適切な場所にいるようだ.
例えば、あなたの会社が大量のデータを処理して、顧客から良いアドバイスと利益を与えるならば、ほとんどすべての場合、ワークフローは同じです.S 3のバケツ/Azure Blobストレージの各夜のあなたのプロバイダの中に生データをいくつかのファイルを作成します.第二に、そのデータを収集し、構造的に集約する(例えば、bigqueryテーブルにプッシュ).さらに、複雑なSQLスクリプトで処理し、集約を行い、無効なデータをパッチします.その後、いくつかの外部APIを使用してデータをチェックする必要があります(特別なサービスでもあります)が、データサイズは、並列処理やキュー処理なしで動作を開始するには大きすぎます.そして最後に、検証の後、あなたの顧客にそれを表示するためにいくつかのデータプレビューツール(タブローダッシュボード)で最終結果を接続する必要があります.
私たちが見ることができるように、このプロセスは一見簡単ではありません.そして、実際の生活で扱う多くのケースがあります.
このため、ワークフローマネージャを持っている必要があります.そして最後の数年間、事実上、これはapace気流です.

歴史
エアフローは、2014年にAirbnbの内部プロジェクトのように生まれました.最初から、オープンソースのプロジェクトだったので、それは可能な限り高速で適切な機能を提供することでした.2016年には、プロジェクトはApacheのインキュベーターに移動し、2019年のエアフローはApacheのソフトウェア財団のトップレベルのプロジェクトになります.

コンポーネント
AirFlowはPythonプロジェクトですので、ほとんどすべての機能はPythonコードです.
エアフローでの作業を開始するには、設定を提供する必要があります.そして、構成は仕事をする平行タスクの数に強く依存します.
また、必要なコンポーネントがあります.
  • メタデータデータベース-気流は、現在および過去のタスク、法令、および結果に関するすべてのメタ情報を保存するデータベース.私はここでPostgres(より安定して効果的なワークフロー)を使用することをお勧めしますが、MySQL、MSSQL、およびSQLiteのための構成と接続もあります.
  • スケジューラ-システムコンポーネントは、パイプライン記述を使用してファイルを解析し、Executor
  • Webサーバー- Grassornを介して実行フラスコベースのアプリ.主な目標は、視覚的にパイプラインプロセスを表示し、それを制御することです.
  • Executor -コードを実行する特殊な部分(実行)
  • また、タスクに依存するコンポーネントもあります.
  • triggerer -単純な方法で-非同期演算子のイベントループです.現在、それらの多くはありませんので、あなたはTriggerer ワークフローのコンポーネント
  • 労働者-セロリリブからの労働者を修正.セロリがあなたの仕事を走らせる小さなノード.

  • 実行
    ジョブを記述するPythonコードはどこかで実行する必要があります.この部分はExecutor . エアフローは、次の種類の実行者をサポートします.
  • SequentialExecutor -ローカルフローのメインスレッドでコードを実行します.
  • LocalExecutor -ローカルのコードを実行しますが、OSのさまざまなプロセスで
  • セロリ労働者の仕事
  • Daskクラスタで
  • K 8ポッドで
  • 私の経験から、生産コードはセロリ/Kubernetes役員に基づいています.この事実を考慮する必要があります.すべてのタスクは、その分離された環境で実行され、さまざまな物理デバイス上の高い可能性を持つ.したがって、タスクのシーケンス“ディスクにファイルをダウンロード”と“クラウドストレージにファイルをアップロード”正しく動作しません.詳細な情報を見つけることができますhere
    ご覧のように、気流は非常にカスタマイズ可能です.構成は、ほとんどのカスタム方法で可能な限りの要件を閉じることができます.
    一般に、2つの最も広がったアーキテクチャがあります.single-node and multi-node :



    インストール
    Apache Airflowをインストールする方法はいくつかあります.それらをチェックしましょう.

    パッケージマネージャー
    簡単な方法ではない.まず最初に、すべての依存関係をインストールする必要がありますSequentialExecutor ). 良い練習は、Pythonの仮想envを初期化することです.
    python -m pip install apache-airflow
    airflow webserver
    airflow scheduler
    

    -分離したDocker画像
    私は、あなたが裸の金属サーバーで気流を走らせようとしているとき、これが役に立つとわかりました.
    docker run … postgres
    docker run … apache/airflow scheduler
    docker run … apache/airflow webserver
    

    - Dockerの作成
    私の意見で-きれいで簡単な方法.すべての設定でDockerを作成する必要があります.
    docker compose up
    

    -天文学者
    私はこのツールであまり働きませんでした、しかし、それは良いコミュニティを持ちます.また、彼らはエアフローと作業プロセスを簡素化するすべてのフック/演算子の内部レジストリを持っています.

    ベースコンセプト
    この物語の主なエンティティは、DAG(直接非サイクルグラフ)-タスクの家政婦です.その広がりタイトル、あなたは異なる言語でそれを満たすことができます.

    このグラフの辺はTask , これはOperator .
    一般的にすべての演算子を分割することができます.
  • アクション演算子-何らかのアクションを作成する
  • Transfer演算子-データを1つの場所から別の場所へ移動する( S 3 TokCpoperator )
  • センサー演算子
  • 各パイプラインは内部で働いているTask Instance - timespanを持つ演算子のインスタンス(この演算子が起動されるとき).また、構成することができますVariables and Connections - 環境変数で、異なる接続文字列、ログなどをWebパーツで保持する場合は、UIで設定できます.
    最後ではなくHook - 外部サービスのインターフェイスです.フックは、人気の図書館、API、DBSのまわりの包装紙です.たとえば、SQL Serverへの接続を処理する必要がある場合は、SqlServiceHookについて考えることができます(既に存在します).

    を作成する.メイン・モーメンツ
    まず、いくつかの宣言が必要です.
    import requests
    import pandas as pd
    from pathlib import Path
    from airflow.models import DAG
    from airflow.operators.python import PythonOperator
    
    次に、ダウンロードデータの2つの機能を作成し、それをピボット(これらの2つのタスクが1つの場所で実行されているかどうかを確認するには、実行者をチェックすることを忘れないでください)
    def download_data_fn():
       url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
       resp = requests.get(url)
       Path('titanic.csv').write_text(resp.content.decode())
    
    def pivot_data_fn():
       df = pd.read_csv('titanic.csv')
       df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
       df.reset_index().to_csv('titanic_pivoted.csv')
    
    最後に、実行順序でDAGを作成します.
    with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
       download_data = PythonOperator(
           task_id='download_data',
           python_callable=download_data_fn,
           dag=dag,
       )
    
       pivot_data = PythonOperator(
           task_id='pivot_data',
           python_callable=pivot_data_fn,
           dag=dag,
       )
    
       download_data >> pivot_data
    
    # variants:
    # pivot_data << download_data 
    # download_data.set_downstream(pivot_data)
    # pivot_data.set_upstream(download_data)
    
    
    作成されたファイルは、すべてのダグが位置しているフォルダーに配置する必要があります.デフォルトでは、それは- $ airflowpersホーム/dagsです.もしそうならば、スケジューラは実行順序にそれをします、そして、実行者は9分ごとにそれを走らせます.

    XCOM
    タスクAとタスクBの間の依存関係があります.タスクを実行するだけでなく、コンソールのパイプのような結果を渡します.この目的のために、xcomsを使うことができます.
    XCOMS(クロスタスクコミュニケーション)では、タスクはメタデータDBに特殊なメタデータを書くことができます.前の例を取り、少し修正してください.
    def download_data_fn(**context):
       filename = 'titanic.csv'
       url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
       resp = requests.get(url)
       Path(filename).write_text(resp.content.decode())
       #context['ti'].xcom_push(key='filename', value=filename) # option 1
       return filename # option 2
    
    
    def pivot_data_fn(ti, **context):
       # filename = ti.xcom_pull(task_ids=['download_data'], key='filename') # option 1
       filename = ti.xcom_pull(task_ids=['download_data'], key='return_value') # option 2
       df = pd.read_csv(filename)
       df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
       df.reset_index().to_csv('titanic_pivoted.csv')
    
    with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
       download_data = PythonOperator(
           task_id='download_data',
           python_callable=download_data_fn,
           provide_context=True,
       )
    
       pivot_data = PythonOperator(
           task_id='pivot_data',
           python_callable=pivot_data_fn,
           provide_context=True,
       )
    
       download_data >> pivot_data
    
    私たちが見ることができるように、XCOMオブジェクトを使用する異なった方法がありますが、データが小さくなければならないことに留意してください.データサイズが大きい場合は、データをDBにデータを保存し、メタDBの限界に達することができます.第二に、エアフローは、単にオーケストラであり、データ処理のために使用してはいけません.

    下方
    あなたが知っていると心の中で良い結果を得るために多くのことを維持する必要があります.それは複雑なツールですが、その複雑な仕事.また、ほとんどの場合デバッグやトレースする場合は、ローカルフローのインスタンスがあります.

    代替案
    どのような気流は、市場で唯一のものではない知って良いことだ.があるDagster and Spotify Luigi など.しかし、彼らは別の長所と短所を持って、あなたのタスクのための最良の適切なツールを選択する市場で良い調査を行ったことを確認してください.
    今日はこれだけです.)私はこの記事はいくつかの手がかりを与えるだろうと彼らのための基礎は、気流とオーケストレーションとの作業を開始します.ステイ!