アパッチ気流.複雑なワークフローを簡単な仕事として作る方法
イントロ
数週間前、私は、このプラットフォームで機能要求に関して働き始めました.特徴はgcp,観測,処理されるデータ数に関係していた.私は、流れを簡単できれいにするために本当に強力な何かを探していて、仕事をつくって、走らせました.そして、整合性、耐故障性、および正しいエラー処理も忘れてはいけません.
私の研究は、特にアパッチ空気流について、オーケストラの話題に私をもたらします.
なぜあなたはオーケストレーションが必要ですか?
あなたがExcelにいくつかのデータをエクスポートするだけで簡単なタスクを持っている場合、多分、あなたはすべてでオーケストレーションを使用する必要はありません.しかし、もしあなたが処理後に本当に良い利益をもたらすデータを操作している場合は、主に大量の非クリーンなデータを毎日働いて-あなたはそれについて考え始めるために適切な場所にいるようだ.
例えば、あなたの会社が大量のデータを処理して、顧客から良いアドバイスと利益を与えるならば、ほとんどすべての場合、ワークフローは同じです.S 3のバケツ/Azure Blobストレージの各夜のあなたのプロバイダの中に生データをいくつかのファイルを作成します.第二に、そのデータを収集し、構造的に集約する(例えば、bigqueryテーブルにプッシュ).さらに、複雑なSQLスクリプトで処理し、集約を行い、無効なデータをパッチします.その後、いくつかの外部APIを使用してデータをチェックする必要があります(特別なサービスでもあります)が、データサイズは、並列処理やキュー処理なしで動作を開始するには大きすぎます.そして最後に、検証の後、あなたの顧客にそれを表示するためにいくつかのデータプレビューツール(タブローダッシュボード)で最終結果を接続する必要があります.
私たちが見ることができるように、このプロセスは一見簡単ではありません.そして、実際の生活で扱う多くのケースがあります.
このため、ワークフローマネージャを持っている必要があります.そして最後の数年間、事実上、これはapace気流です.
歴史
エアフローは、2014年にAirbnbの内部プロジェクトのように生まれました.最初から、オープンソースのプロジェクトだったので、それは可能な限り高速で適切な機能を提供することでした.2016年には、プロジェクトはApacheのインキュベーターに移動し、2019年のエアフローはApacheのソフトウェア財団のトップレベルのプロジェクトになります.
コンポーネント
AirFlowはPythonプロジェクトですので、ほとんどすべての機能はPythonコードです.
エアフローでの作業を開始するには、設定を提供する必要があります.そして、構成は仕事をする平行タスクの数に強く依存します.
また、必要なコンポーネントがあります.
Executor
Triggerer
ワークフローのコンポーネント実行
ジョブを記述するPythonコードはどこかで実行する必要があります.この部分は
Executor
. エアフローは、次の種類の実行者をサポートします.ご覧のように、気流は非常にカスタマイズ可能です.構成は、ほとんどのカスタム方法で可能な限りの要件を閉じることができます.
一般に、2つの最も広がったアーキテクチャがあります.single-node and multi-node :
インストール
Apache Airflowをインストールする方法はいくつかあります.それらをチェックしましょう.
パッケージマネージャー
簡単な方法ではない.まず最初に、すべての依存関係をインストールする必要があります
SequentialExecutor
). 良い練習は、Pythonの仮想envを初期化することです.python -m pip install apache-airflow
airflow webserver
airflow scheduler
-分離したDocker画像
私は、あなたが裸の金属サーバーで気流を走らせようとしているとき、これが役に立つとわかりました.
docker run … postgres
docker run … apache/airflow scheduler
docker run … apache/airflow webserver
- Dockerの作成
私の意見で-きれいで簡単な方法.すべての設定でDockerを作成する必要があります.
docker compose up
-天文学者
私はこのツールであまり働きませんでした、しかし、それは良いコミュニティを持ちます.また、彼らはエアフローと作業プロセスを簡素化するすべてのフック/演算子の内部レジストリを持っています.
ベースコンセプト
この物語の主なエンティティは、DAG(直接非サイクルグラフ)-タスクの家政婦です.その広がりタイトル、あなたは異なる言語でそれを満たすことができます.
このグラフの辺は
Task
, これはOperator
. 一般的にすべての演算子を分割することができます.
Task Instance
- timespanを持つ演算子のインスタンス(この演算子が起動されるとき).また、構成することができますVariables
and Connections
- 環境変数で、異なる接続文字列、ログなどをWebパーツで保持する場合は、UIで設定できます.最後ではなく
Hook
- 外部サービスのインターフェイスです.フックは、人気の図書館、API、DBSのまわりの包装紙です.たとえば、SQL Serverへの接続を処理する必要がある場合は、SqlServiceHookについて考えることができます(既に存在します).を作成する.メイン・モーメンツ
まず、いくつかの宣言が必要です.
import requests
import pandas as pd
from pathlib import Path
from airflow.models import DAG
from airflow.operators.python import PythonOperator
次に、ダウンロードデータの2つの機能を作成し、それをピボット(これらの2つのタスクが1つの場所で実行されているかどうかを確認するには、実行者をチェックすることを忘れないでください)def download_data_fn():
url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
resp = requests.get(url)
Path('titanic.csv').write_text(resp.content.decode())
def pivot_data_fn():
df = pd.read_csv('titanic.csv')
df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
df.reset_index().to_csv('titanic_pivoted.csv')
最後に、実行順序でDAGを作成します.with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
download_data = PythonOperator(
task_id='download_data',
python_callable=download_data_fn,
dag=dag,
)
pivot_data = PythonOperator(
task_id='pivot_data',
python_callable=pivot_data_fn,
dag=dag,
)
download_data >> pivot_data
# variants:
# pivot_data << download_data
# download_data.set_downstream(pivot_data)
# pivot_data.set_upstream(download_data)
作成されたファイルは、すべてのダグが位置しているフォルダーに配置する必要があります.デフォルトでは、それは- $ airflowpersホーム/dagsです.もしそうならば、スケジューラは実行順序にそれをします、そして、実行者は9分ごとにそれを走らせます.XCOM
タスクAとタスクBの間の依存関係があります.タスクを実行するだけでなく、コンソールのパイプのような結果を渡します.この目的のために、xcomsを使うことができます.
XCOMS(クロスタスクコミュニケーション)では、タスクはメタデータDBに特殊なメタデータを書くことができます.前の例を取り、少し修正してください.
def download_data_fn(**context):
filename = 'titanic.csv'
url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
resp = requests.get(url)
Path(filename).write_text(resp.content.decode())
#context['ti'].xcom_push(key='filename', value=filename) # option 1
return filename # option 2
def pivot_data_fn(ti, **context):
# filename = ti.xcom_pull(task_ids=['download_data'], key='filename') # option 1
filename = ti.xcom_pull(task_ids=['download_data'], key='return_value') # option 2
df = pd.read_csv(filename)
df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
df.reset_index().to_csv('titanic_pivoted.csv')
with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
download_data = PythonOperator(
task_id='download_data',
python_callable=download_data_fn,
provide_context=True,
)
pivot_data = PythonOperator(
task_id='pivot_data',
python_callable=pivot_data_fn,
provide_context=True,
)
download_data >> pivot_data
私たちが見ることができるように、XCOMオブジェクトを使用する異なった方法がありますが、データが小さくなければならないことに留意してください.データサイズが大きい場合は、データをDBにデータを保存し、メタDBの限界に達することができます.第二に、エアフローは、単にオーケストラであり、データ処理のために使用してはいけません.下方
あなたが知っていると心の中で良い結果を得るために多くのことを維持する必要があります.それは複雑なツールですが、その複雑な仕事.また、ほとんどの場合デバッグやトレースする場合は、ローカルフローのインスタンスがあります.
代替案
どのような気流は、市場で唯一のものではない知って良いことだ.があるDagster and Spotify Luigi など.しかし、彼らは別の長所と短所を持って、あなたのタスクのための最良の適切なツールを選択する市場で良い調査を行ったことを確認してください.
今日はこれだけです.)私はこの記事はいくつかの手がかりを与えるだろうと彼らのための基礎は、気流とオーケストレーションとの作業を開始します.ステイ!
Reference
この問題について(アパッチ気流.複雑なワークフローを簡単な仕事として作る方法), 我々は、より多くの情報をここで見つけました https://dev.to/leefrost/apache-airflow-how-to-make-the-complex-workflow-as-an-easy-job-4a0pテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol