Pythonのastモジュールを活用してAirflowのDAG依存関係をDAG Docsに表示する
AirflowでExternalTaskSensorを使って他のDAGの完了を待ち受けるような構成の場合、Web UIにDAGの依存関係が表示されると障害対応の時などに便利です。Web UIへのドキュメントの表示はDAG Docsに記述すれば良いのですが、DAG更新のたびに手入力するのも面倒だし更新漏れも発生します。
なので、自動で依存関係を抽出してDAG Docsに表示する仕組みを考えて実装してみました。以下のように表示されるようになります。
仕組み
dagsディレクトリに以下のような依存関係を定義したJSONファイルを保存しておいて、DAGファイルから読み込んでMarkdownでいい感じ表示するだけです。
{
"dag_21": {
"upstream": [
"dag_11"
],
"downstream": []
},
"dag_11": {
"upstream": [],
"downstream": [
"dag_31",
"dag_21"
]
},
"dag_31": {
"upstream": [
"dag_11"
],
"downstream": []
}
}
DAG側ではヘルパー関数を使って、JSONをロードしてMarkdownに整形した文字列をdoc_mdに設定します。
DEPENDENCY_FILE = f'{os.path.dirname(os.path.abspath(__file__))}/dependencies.json'
def print_dependencies(dag_id):
with open(DEPENDENCY_FILE, 'r+') as f:
dependencies = json.load(f)
upstream_dag_list = "\n".join(["- " + dag for dag in dependencies[dag_id]["upstream"]])
downstream_dag_list = "\n".join(["- " + dag for dag in dependencies[dag_id]["downstream"]])
return dedent('''\
### Dependencies
#### Upstream DAGs
{upstream_dag_list}
#### Downstream DAGs
{downstream_dag_list}
'''
).format(upstream_dag_list=upstream_dag_list, downstream_dag_list=downstream_dag_list)
with DAG(
'dag_11',
...
) as dag:
...
dag.doc_md = print_dependencies(dag.dag_id)
...
肝心のdependencies.jsonの作成・更新方法ですが、こちらはPythonのastモジュールでDAGファイルをパースしてExternalTaskSensorの定義からDAGの依存関係を抽出しています。
import ast
...
DEPENDENCY_FILE = f'{os.path.dirname(os.path.abspath(__file__))}/dependencies.json'
class DependencyAnalyzer(ast.NodeVisitor):
def __init__(self, filename):
super().__init__()
self.__dependencies = set()
self.current_dag = ''
self.filename = filename
# DAGファイルのコードをパースして依存DAGをself.__dependenciesに設定する
def analyze(self):
with open(self.filename, 'r') as f:
node = ast.parse(f.read())
self.visit(node)
# ExternalTaskSensorの呼び出しから引数external_dag_idの値を抽出する
def visit_Call(self, node: ast.Call):
func = node.func
if isinstance(func, ast.Name):
if func.id == 'ExternalTaskSensor':
for kwarg in node.keywords:
if kwarg.arg == 'external_dag_id':
self.__dependencies.add(kwarg.value.value)
elif func.id == 'DAG':
self.current_dag = node.args[0].value
@property
def dependencies(self):
return (self.current_dag, self.__dependencies)
def update_dependency(dag_id, dependencies):
if os.path.exists(DEPENDENCY_FILE):
with open(DEPENDENCY_FILE, 'r+') as f:
dependencies_master_json = json.load(f)
else:
dependencies_master_json = {}
# 対象のDAGのupstreamを設定・更新
deps = dependencies_master_json.setdefault(dag_id, {'upstream': [], 'downstream': []})
deps['upstream'] = list(dependencies)
# upstreamのDAGのdownstreamに対象のDAGを設定
for dependent_dag in dependencies:
deps = dependencies_master_json.setdefault(dependent_dag, {'upstream': [], 'downstream': []})
downstream = set(deps['downstream'])
downstream.add(dag_id)
deps['downstream'] = list(downstream)
# 対象のDAGが依存していないDAGのdownstreamから対象のDAGを削除
for dag, deps in dependencies_master_json.items():
if dag not in dependencies:
deps['downstream'] = list(set(deps['downstream']) - {dag_id})
# dependencies.jsonに書き出す
with open(DEPENDENCY_FILE, 'w+') as f:
json.dump(dependencies_master_json, f)
if __name__ == '__main__':
filename = sys.argv[1] # DAGファイル名
v = DependencyAnalyzer(filename)
v.analyze()
update_dependency(*v.dependencies)
このスクリプトをCI/CDで自動実行するようにしておけばDAGファイルの更新の都度、自動でDAG Docsの依存関係ドキュメントも更新されるようになります。
# CI/CDでdependencies.jsonを更新
$ python dag_dependency.py dag_11.py
今回はupstream DAGとdownstream DAGをDAG Docsに表示するだけでしたが、dependencies.jsonがあればend-to-endでDAGの依存関係を表示することもできるかと思います。そうすることで、問題発生時の影響範囲が一目瞭然となります。シンプルな仕組みながら色々な使い方ができそうです。
Author And Source
この問題について(Pythonのastモジュールを活用してAirflowのDAG依存関係をDAG Docsに表示する), 我々は、より多くの情報をここで見つけました https://zenn.dev/koji_mats/articles/6cc9323064ac9b著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Collection and Share based on the CC protocol