Airflow概要と、Kubernetes/HELM on Rancher で起動
ETLワークフローエンジン Apache Airflowを、Kubernetes on Rancherで、HELMインストールする設定メモ。
KubernetesやRancherの設定はこちら
https://qiita.com/suzukihi724/items/00b167c6f5f2ddeca718
Airflowとは?
・ビッグデータパイプラインを管理するためのワークフロー自動化およびスケジューリングを行うOSS
・2014年にAirbnbで開発され、現在はThe Apache Software Foundationのトップレベルプロジェクト
・pythonやbashでタスクを定義可能
・有向非巡回モデル (DAG) としてタスクのワークフローを作る
・複数クラスタでの並列分散実行も可能
・AdobeやGoogleなど200を超える組織で利用中
・GCPの「Cloud Composer」はマネージドAirflowサービス
・AWS機械学習基盤サービスSageMakerと統合可能で、MLワークフローでも利用が進んでいる
その他のETLツール例
・Luigi : https://github.com/spotify/luigi
・Digdag : https://www.digdag.io/
・argo : https://github.com/argoproj/argo#what-is-argo-workflows
・oozie : https://oozie.apache.org/
・NiFi : https://nifi.apache.org/
参考)
公式ドキュメント : https://airflow.apache.org/
Github : https://github.com/apache/airflow
https://news.mynavi.jp/article/20190109-753859/
今回の前提条件
各ソフトウェア
・Airflow : Version 4.0.9
・Rancher : v2.2.3
・Kubernetes : v1.13.5
設置環境
・AWS
※KubernetesはEKSではなくEC2(Amazon Linux2)
Airflowのインストール
Rancherのカタログ機能で、HELMチャートで公開されているものを利用。
とりあえずのインストールは、Rancher上から数クリックで可能。
インストールされると、「Apps」に追加される
事前にKubernetesの Persistent Volume(永続的ボリューム)の設定が必要で、別途nfs-provisionerを利用。
正常にセットアップが完了すると、以下のようなServiceが立ち上がる
> kubectl -n airflow-2 get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
airflow-2-flower ClusterIP 10.43.166.179 <none> 5555/TCP 28h
airflow-2-postgresql ClusterIP 10.43.143.186 <none> 5432/TCP 28h
airflow-2-redis-headless ClusterIP None <none> 6379/TCP 28h
airflow-2-redis-master ClusterIP 10.43.183.71 <none> 6379/TCP 28h
airflow-2-web ClusterIP 10.43.135.222 <none> 8080/TCP 28h
airflow-2-worker ClusterIP None <none> 8793/TCP 28h
Ingressによる外部からのアクセス設定
とりあえずインストールされても、デフォルトなので色々と事足りない。
AirflowのWEB UIの外部からアクセスするEndpointがないため、Ingressの追加が必要。
kubectlがたたける環境で下記Ingressのyamlをデプロイする。
この時、IngressはIPアドレスではなくドメイン指定のため、AirflowをデプロイするKubernetesクラスタのworkerノードのパブリックDNS名を設定。
ServiceNameにはairflowのwebのserviceとportを指定。
サンプルIngress yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: sample-ingress
spec:
rules:
- host: XXX.ap-northeast-1.compute.amazonaws.com
http:
paths:
- backend:
serviceName: airflow-2-web
servicePort: 8080
ingressがデプロイされていることを確認
$ kubectl -n airflow-2 get ingress
NAME HOSTS ADDRESS PORTS AGE
sample-ingress XXX.ap-northeast-1.compute.amazonaws.com 172.18.0.XX,172.18.0.YY 80 114m
Rancher上でも、「Airflow」AppsのEndpointが設定追加されている。
これで外部からドメイン指定でブラウザアクセスすると、airflowのGUIにアクセスできる。
例: http://XXX.ap-northeast-1.compute.amazonaws.com
DAGの追加
DAGファイルは Airflow のコンフィグファイル「airflow.cfg」内記載の「dags_folder」のパスにDAGのpythonファイルを格納すると認識される。
例:dags_folder = /usr/local/airflow/dags
実際にtutorialのDAGを格納したが、すぐには認識してくれないため調査中。
しばらくすると、下記のように追加された。
DAGの詳細も確認できる
なお、このDAGは公式tutorialから
https://airflow.apache.org/tutorial.html
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
ポイントは以下の通り
・BashOperatorを使うことで各taskの処理をbashで記述
・タスクは3種類
・task1(print_date)を実行したら、task2(sleep)とtask3(templated)を実行するというワークフロー
・タスクの順序は .set_upstream()
または >>
で定義できる
・schedule_interval
で実行周期を規定。ここでは1日おき
Airflowの構成
Airflowの構成要素は、並列実行の有無などで多少異なるが、基本的には以下のようになる
・Airflow Web Server : GUI
・Airflow Scheduler : ジョブをスケジュール
・Worker : 実際にジョブを実行
・メタデータDB : DAGスケジュールやDAG Runの情報
・Executor : ジョブの実行/キューイング
Executorにも複数種類あり
- sequential_executor : schedulerのホストで単一プロエスでジョブ実行
- local_executor : scheduler : schedulerのホストで複数プロセスで並列ジョブ実行
- celery_executor : worker用ホストで複数プロセスで並列ジョブ実行
実運用を考えるとおそらくcelery_executorに。
Celeryは複数のノードで分散して非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークで、
broker(ノード間のメッセージのやり取り)にRabbitMQやRedis等のミドルウェアを使用。
参考)
https://analytics.livesense.co.jp/entry/2018/02/06/132842
celery_executorの場合の標準的な構成は以下の通り。
参考)
https://www.datacouncil.ai/hubfs/DataEngConf/Data%20Council/Slides%20SF%2019/Running%20Airflow%20reliably%20with%20Kubernetes.pdf
Airflow on Kubernetes
前述のCelery Executorの構成をそのままk8sにpodでデプロイする方法もあるが、workerの実行処理をk8s側にゆだねることで最適化した「Kubernetes Executor」がある。
Celery Executorで必要な Redis や、Worker 要素が kubernetes側でカバーしてくれるイメージ。
ただし、前述の Rancherカタログ機能でHELMインストールしたAirflowは、この最適化されたKubernetes Executorではなく、標準ではSequential Executorが設定された。
Author And Source
この問題について(Airflow概要と、Kubernetes/HELM on Rancher で起動), 我々は、より多くの情報をここで見つけました https://qiita.com/suzukihi724/items/4ab9cb62eae50138e560著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .