MySQL Event SchedulerをAirFlowに変換


Airflowは、データパイプラインの構築やインフラストラクチャのメンテナンスなど、さまざまな環境で使用できます.OS、DBで設定したバッチまたはスケジューラをAirFlowに変換した経験を共有します.
Tip. バッチ(=batch)とは、エンドユーザの介入を必要とせずに実行またはスケジュールされたタスクを指す.コンピュータープログラムの流れに従って資料を順番に処理する方式です.ウィキペディア

n/a.ターゲット


MySQLのイベントスケジューラをAirFlowに切り替えます.

TL;DR

  • Docke-Composerを実行します.(docker-compose up)
  • Airflow、Connectionを作成します.(Mysql Connection)
  • Airflow、DAGを作成します.(MySQL Operators)

  • Getting Started


    実習環境は以下の通りである.
    Docker : 20.10.2
    Docker Compose : 1.27.4, build 40524192
    Python : 3.9.0
    Airflow : 2.0.1
    MySQL : 8.0.23

    File Directory with Docker-Compose


    Docker-ComposeはAirflowとMySQLを次のように構成します.
    airflow-mysql
    ├── docker-compose.yaml
    ├── airflow
    │   ├──plugins
    │   ├──dags
    │   │   ├── example_mysql.py
    │   │   └── hello_world.py
    │   ├──sql
    │   │   └── example_mysql.sql
    │   ├──logs
    ├── mysql-01
    │   ├── conf
    │   │   └── my.cnf
    │   ├── data
    │   └── log
    DockerベースのAirflowMySQLの構成については、前の記事を参照してください.

    docker-compose.yaml


    “docker-compose.yaml”
    version: '3'
    x-airflow-common:
      &airflow-common
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.1}
      environment:
        &airflow-common-env
        AIRFLOW__CORE__EXECUTOR: CeleryExecutor
        AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
        AIRFLOW__CORE__FERNET_KEY: ''
        AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
        AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
        AIRFLOW__WEBSERVER__WORKERS: 1
      volumes:
        - ./dags:/opt/airflow/dags
        - ./logs:/opt/airflow/logs
        - ./plugins:/opt/airflow/plugins
      user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
      depends_on:
        redis:
          condition: service_healthy
        postgres:
          condition: service_healthy
    
    services:
      postgres:
        image: postgres:13
        environment:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        volumes:
          - postgres-db-volume:/var/lib/postgresql/data
        healthcheck:
          test: ["CMD", "pg_isready", "-U", "airflow"]
          interval: 5s
          retries: 5
        restart: always
    
      redis:
        image: redis:latest
        ports:
          - 6379:6379
        healthcheck:
          test: ["CMD", "redis-cli", "ping"]
          interval: 5s
          timeout: 30s
          retries: 50
        restart: always
    
      airflow-webserver:
        <<: *airflow-common
        command: webserver
        ports:
          - 8080:8080
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
          interval: 10s
          timeout: 10s
          retries: 5
        restart: always
    
      airflow-scheduler:
        <<: *airflow-common
        command: scheduler
        restart: always
    
      airflow-worker:
        <<: *airflow-common
        command: celery worker
        restart: always
    
      airflow-init:
        <<: *airflow-common
        command: version
        environment:
          <<: *airflow-common-env
          _AIRFLOW_DB_UPGRADE: 'true'
          _AIRFLOW_WWW_USER_CREATE: 'true'
          _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
          _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    
      flower:
        <<: *airflow-common
        command: celery flower
        ports:
          - 5555:5555
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
          interval: 10s
          timeout: 10s
          retries: 5
        restart: always
    
      mysql-01:
        container_name: mysql-01
        hostname: mysql-01
        image: mysql:8.0.23
        command: mysqld --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
        environment:
          MYSQL_ROOT_PASSWORD: root
        ports:
          - '3306:3306'
        volumes:
          - "${PWD}/mysql-01/data:/var/lib/mysql"
          - "${PWD}/mysql-01/log:/var/log/mysql"
          - "${PWD}/mysql-01/conf/my.cnf:/etc/mysql/conf.d/my.cnf"
    
    volumes:
      postgres-db-volume:
    Tip. Dockerのメモリクォータを超えると、サービスを開始できません.メモリの使用状況を確認して調整します.

    Airflow Providers, Example DAGs


    Airflowは、複数のProviders Packagesをサポートします.また、MySQL、PostgreSQL、SQL Server、Oracle、ODBCもサポートされており、データベース接続後にQueryとBatch Jobを実行できます.さらに、NoSQL、MongDB、REDIS、グラフィックDB Neo 4 jなど、多くの環境を提供しています.(2021-03-04 update)

    Example DAGs - MySQL


    Airflowで接続とDAGを作成できます.

    A.接続の作成


    AirFlowでMySQL接続の接続を作成します.
    「管理」>「接続」を選択します.

    Conn id、Conn type、Host、Login、Password、Portを入力します.Conn idの場合は、DAQに接続を読み込むときに使用する名前です.

    Tip. AirflowサーバとMySQLサーバの間の3306ポートが開いているかどうかを確認します.(ex. tcping localhost 3306)

    B.DAG作成


    MySQLサンプルDAGを作成します.次のdagsサブアイテムに「example mysql.py」を作成します.
    airflow-mysql
    ├── airflow
    │   ├──dags
    │   │   ├── example_mysql.py
    「example mysql.py」を見てみましょう.

    example_mysql.py


    MySQLのイベントプランプログラム内容「DROP TABLE table name;」構文をDAGに変換します.DAG Pythonコードで直接Queryを実現します.クエリーはsqlファイルとして外部に格納されます.sqlファイルを使用して実装する場合は、後続のクエリーの変更が必要な場合にファイルを変更するだけで、操作に役立ちます.
    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    # [START howto_operator_mysql]
    
    drop_table_mysql_task = MySqlOperator(
        task_id='create_table_mysql', mysql_conn_id='mysql_conn_id', sql=r"""DROP TABLE table_name;""", dag=dag
    )
    
    # [END howto_operator_mysql]
    
    # [START howto_operator_mysql_external_file]
    
    mysql_task = MySqlOperator(
        task_id='create_table_mysql_external_file',
        mysql_conn_id='mysql_conn_id',
        sql='/scripts/drop_table.sql',
        dag=dag,
    )
    
    # [END howto_operator_mysql_external_file]
    
    drop_table_mysql_task >> mysql_task
    DAGSは、「example mysql」という名前のDAGを作成します.

    Runsエントリには、DAGの実行状態に関する情報が表示されます.「example mysql」チェックは1回成功、6回失敗.計画で設定した繰返しスケジュールを表示したり、操作で再実行したりできます.
    計画を選択すると、DAGの実行履歴が一目で表示されます.

    C.DAG詳細


    「example mysql」DAGを選択し、「Code Tab」を選択します.UIを使用してDAGコードを表示することもできます.

    このほか、TABはDAG内のTaskを可視化し、各Taskの処理時間とログを詳細に表示することもできる.
  • Tree View
  • Graph View
  • Task Duration
  • Task Tries
  • Landing Times
  • Gantt
  • Details
  • Code
  • 次はTree Viewです.DAGのTaskは可視化されており、各Taskの実行の成功と失敗を見ることができます.

    DAGの各Taskログは、次のように表示できます.問題の原因をログで分析できます.

    今後の操作での経験を引き続き更新します.
    githubですべてのコードを表示できます.