MySQL Event SchedulerをAirFlowに変換
Airflowは、データパイプラインの構築やインフラストラクチャのメンテナンスなど、さまざまな環境で使用できます.OS、DBで設定したバッチまたはスケジューラをAirFlowに変換した経験を共有します.
Tip. バッチ(=batch)とは、エンドユーザの介入を必要とせずに実行またはスケジュールされたタスクを指す.コンピュータープログラムの流れに従って資料を順番に処理する方式です.ウィキペディア
MySQLのイベントスケジューラをAirFlowに切り替えます.
Docke-Composerを実行します.(docker-compose up) Airflow、Connectionを作成します.(Mysql Connection) Airflow、DAGを作成します.(MySQL Operators)
Getting Started
Tree View Graph View Task Duration Task Tries Landing Times Gantt Details Code 次はTree Viewです.DAGのTaskは可視化されており、各Taskの実行の成功と失敗を見ることができます.
DAGの各Taskログは、次のように表示できます.問題の原因をログで分析できます.
今後の操作での経験を引き続き更新します.
githubですべてのコードを表示できます.
Tip. バッチ(=batch)とは、エンドユーザの介入を必要とせずに実行またはスケジュールされたタスクを指す.コンピュータープログラムの流れに従って資料を順番に処理する方式です.ウィキペディア
n/a.ターゲット
MySQLのイベントスケジューラをAirFlowに切り替えます.
TL;DR
Getting Started
実習環境は以下の通りである.
Docker : 20.10.2
Docker Compose : 1.27.4, build 40524192
Python : 3.9.0
Airflow : 2.0.1
MySQL : 8.0.23
File Directory with Docker-Compose
Docker-ComposeはAirflowとMySQLを次のように構成します.airflow-mysql
├── docker-compose.yaml
├── airflow
│ ├──plugins
│ ├──dags
│ │ ├── example_mysql.py
│ │ └── hello_world.py
│ ├──sql
│ │ └── example_mysql.sql
│ ├──logs
├── mysql-01
│ ├── conf
│ │ └── my.cnf
│ ├── data
│ └── log
DockerベースのAirflowとMySQLの構成については、前の記事を参照してください.
docker-compose.yaml
“docker-compose.yaml”version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.1}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__WEBSERVER__WORKERS: 1
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
mysql-01:
container_name: mysql-01
hostname: mysql-01
image: mysql:8.0.23
command: mysqld --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
MYSQL_ROOT_PASSWORD: root
ports:
- '3306:3306'
volumes:
- "${PWD}/mysql-01/data:/var/lib/mysql"
- "${PWD}/mysql-01/log:/var/log/mysql"
- "${PWD}/mysql-01/conf/my.cnf:/etc/mysql/conf.d/my.cnf"
volumes:
postgres-db-volume:
Tip. Dockerのメモリクォータを超えると、サービスを開始できません.メモリの使用状況を確認して調整します.
Airflow Providers, Example DAGs
Airflowは、複数のProviders Packagesをサポートします.また、MySQL、PostgreSQL、SQL Server、Oracle、ODBCもサポートされており、データベース接続後にQueryとBatch Jobを実行できます.さらに、NoSQL、MongDB、REDIS、グラフィックDB Neo 4 jなど、多くの環境を提供しています.(2021-03-04 update)
Example DAGs - MySQL
Airflowで接続とDAGを作成できます.
A.接続の作成
AirFlowでMySQL接続の接続を作成します.
「管理」>「接続」を選択します.
Conn id、Conn type、Host、Login、Password、Portを入力します.Conn idの場合は、DAQに接続を読み込むときに使用する名前です.
Tip. AirflowサーバとMySQLサーバの間の3306ポートが開いているかどうかを確認します.(ex. tcping localhost 3306)
B.DAG作成
MySQLサンプルDAGを作成します.次のdagsサブアイテムに「example mysql.py」を作成します.airflow-mysql
├── airflow
│ ├──dags
│ │ ├── example_mysql.py
「example mysql.py」を見てみましょう.
example_mysql.py
MySQLのイベントプランプログラム内容「DROP TABLE table name;」構文をDAGに変換します.DAG Pythonコードで直接Queryを実現します.クエリーはsqlファイルとして外部に格納されます.sqlファイルを使用して実装する場合は、後続のクエリーの変更が必要な場合にファイルを変更するだけで、操作に役立ちます.from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
dag = DAG(
'example_mysql',
default_args=default_args,
start_date=days_ago(2),
tags=['example'],
)
# [START howto_operator_mysql]
drop_table_mysql_task = MySqlOperator(
task_id='create_table_mysql', mysql_conn_id='mysql_conn_id', sql=r"""DROP TABLE table_name;""", dag=dag
)
# [END howto_operator_mysql]
# [START howto_operator_mysql_external_file]
mysql_task = MySqlOperator(
task_id='create_table_mysql_external_file',
mysql_conn_id='mysql_conn_id',
sql='/scripts/drop_table.sql',
dag=dag,
)
# [END howto_operator_mysql_external_file]
drop_table_mysql_task >> mysql_task
DAGSは、「example mysql」という名前のDAGを作成します.
Runsエントリには、DAGの実行状態に関する情報が表示されます.「example mysql」チェックは1回成功、6回失敗.計画で設定した繰返しスケジュールを表示したり、操作で再実行したりできます.
計画を選択すると、DAGの実行履歴が一目で表示されます.
C.DAG詳細
「example mysql」DAGを選択し、「Code Tab」を選択します.UIを使用してDAGコードを表示することもできます.
このほか、TABはDAG内のTaskを可視化し、各Taskの処理時間とログを詳細に表示することもできる.
airflow-mysql
├── docker-compose.yaml
├── airflow
│ ├──plugins
│ ├──dags
│ │ ├── example_mysql.py
│ │ └── hello_world.py
│ ├──sql
│ │ └── example_mysql.sql
│ ├──logs
├── mysql-01
│ ├── conf
│ │ └── my.cnf
│ ├── data
│ └── log
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.1}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__WEBSERVER__WORKERS: 1
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
mysql-01:
container_name: mysql-01
hostname: mysql-01
image: mysql:8.0.23
command: mysqld --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
MYSQL_ROOT_PASSWORD: root
ports:
- '3306:3306'
volumes:
- "${PWD}/mysql-01/data:/var/lib/mysql"
- "${PWD}/mysql-01/log:/var/log/mysql"
- "${PWD}/mysql-01/conf/my.cnf:/etc/mysql/conf.d/my.cnf"
volumes:
postgres-db-volume:
airflow-mysql
├── airflow
│ ├──dags
│ │ ├── example_mysql.py
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
dag = DAG(
'example_mysql',
default_args=default_args,
start_date=days_ago(2),
tags=['example'],
)
# [START howto_operator_mysql]
drop_table_mysql_task = MySqlOperator(
task_id='create_table_mysql', mysql_conn_id='mysql_conn_id', sql=r"""DROP TABLE table_name;""", dag=dag
)
# [END howto_operator_mysql]
# [START howto_operator_mysql_external_file]
mysql_task = MySqlOperator(
task_id='create_table_mysql_external_file',
mysql_conn_id='mysql_conn_id',
sql='/scripts/drop_table.sql',
dag=dag,
)
# [END howto_operator_mysql_external_file]
drop_table_mysql_task >> mysql_task
DAGの各Taskログは、次のように表示できます.問題の原因をログで分析できます.
今後の操作での経験を引き続き更新します.
githubですべてのコードを表示できます.
Reference
この問題について(MySQL Event SchedulerをAirFlowに変換), 我々は、より多くの情報をここで見つけました https://velog.io/@hansung/Airflow-Example-DAGs-MySQL-Providersテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol