Great Expectationsとdbtとairflowでロバストなデータパイプラインを構築する


これは何か

という2020年のdbt関連のセッションの動画を見て、Great Expectationsとdbtでテストしながらデータモデルを構築するパイプラインをAirflowで動かすことで、ロバストなデータパイプラインを作るのだという内容が気になったので、実際に自分でも作って動かしてみる。

一連の流れ

データパイプラインの概要は下記の通り。
①Great ExpectationsでBigQueryのテーブルに対してバリデーションチェックをかける。バリデーション結果はGCSにアップロードし、閲覧できるようにする。
②バリデーションが成功したら、dbtでデータのTransformを実行する。
③Transformationが成功したら、テストを実行する。
④①〜③をAirflowで動かす。

準備

Great ExpectationsやBigQuery周りは、基本的に前回の記事で書いたものを流用する。

FROM python:3.9-slim

RUN apt-get update -y && \
    apt-get install --no-install-recommends -y -q \
    git libpq-dev python3-dev build-essential && \ 
    apt-get clean && \
    rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*


RUN pip install --upgrade pip && \
    pip install great_expectations && \
    pip install sqlalchemy==1.4.25 && \
    pip install sqlalchemy_bigquery && \
    pip install pybigquery && \
    pip install dbt-bigquery && \
    pip install apache-airflow[gcp] && \
    pip install airflow-dbt

RUN curl -sSL https://sdk.cloud.google.com | bash

ENV PATH $PATH:/root/google-cloud-sdk/bin

ENV PYTHONIOENCODING=utf-8
ENV LANG C.UTF-8
docker-compose.yml
version: '3.9'

services:
  great_expectations:
    container_name: great_expectations
    build: .
    ports:
      - "8888:8888"
      - "8080:8080"
      - "8081:8081"
    tty: true
    working_dir: /usr/app
    environment:
      - DBT_PROFILES_DIR=/usr/app/dbt
      - AIRFLOW_HOME=/usr/app/dag
    volumes:
      - ./scr:/usr/app
      - gcloud-config:/root/.config
    secrets:
      - gcp_secret
  terraform:
    container_name: terraform
    entrypoint: ash
    image: hashicorp/terraform:latest
    working_dir: /tmp/terraform
    volumes:
      - ./scr/terraform:/tmp/terraform
      - gcloud-config:/root/.config
    tty: true
    secrets:
      - gcp_secret
  gcloud:
    container_name: gcloud
    entrypoint: "gcloud"
    image: google/cloud-sdk:alpine
    volumes:
      - gcloud-config:/root/.config
volumes:
  gcloud-config:
secrets:
  gcp_secret:
    file:
      {サービスアカウントのキーファイル}

GCSやBigQuery周りはterraformで構成した。

main.tf
provider "google" {
  project     = var.gcp_project_id
  region      = "us-central1"
  credentials = "${file("${var.GOOGLE_APPLICATION_CREDENTIALS}")}"
}

resource "google_bigquery_dataset" "bigquery_dataset" {
  dataset_id    = "sasakky_data_infra_dataset"
  friendly_name = "sasakky_data_infra_dataset"
  location      = "us-central1"
}

resource "google_storage_bucket" "cloud_storage_bucket" {
  name = "sasakky_gcs_bucket"
  location      = "us-central1"
  force_destroy = true
  website {
    main_page_suffix = "index.html"
    not_found_page   = "404.html"
  }
}
variable.tf
variable "gcp_project_id" {
  default = "{project_id}"
}
variable "GOOGLE_APPLICATION_CREDENTIALS" {
  default = "/run/secrets/gcp_secret"
}

構成

ディレクトリ構成は以下。

.
├── Dockerfile
├── docker-compose.yml
└── scr
    ├── dag(airflowの設定、DAGの配置など)
    ├── dbt(dbtプロジェクト)
    └── great_expectations(Great Expectationsプロジェクト)

Great Expectationsの設定

BigQuery内のcustomerテーブルに対してバリデーションをかける。基本は前回の記事のものを流用するが、一部修正した。
①Data DocsをGCSにアップロードするために、great_expectations.ymlに以下の内容を追記する。

great_expectations.yml
data_docs_sites:
  gs_site:
    class_name: SiteBuilder
    store_backend:
      class_name: TupleGCSStoreBackend
      project: {プロジェクトID}
      bucket: sasakky_gcs_bucket
    site_index_builder:
      class_name: DefaultSiteIndexBuilder

②Suiteから不要なexpectationsを除外した。PKチェックとNullチェックだけ残す。

customer_suite.json
{
  "data_asset_type": null,
  "expectation_suite_name": "customer_suite",
  "expectations": [
    {
      "expectation_type": "expect_column_values_to_be_unique",
      "kwargs": {
        "column": "customer_id"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "customer_id"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "customer_name"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "gender_cd"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "gender"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "birth_day"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "age"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "postal_cd"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "address"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "application_store_cd"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "application_date"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "status_cd"
      },
      "meta": {}
    }
  ],
  "ge_cloud_id": null,
  "meta": {
    "great_expectations_version": "0.14.12"
  }
}

dbtの設定

customerテーブルから男性を抜き出したビューを作成する。

customers_men.sql
select
  *
from
  {{ source('sasakky_data_infra_dataset', 'customer') }}
where
  gender_cd = "0" -- 0はgender="男性"

customerテーブルをsource登録し、customers_menに対して正しくtransformが実行できているかテストを記述する。

schema.yml
version: 2

sources:
  - name: sasakky_data_infra_dataset
    tables:
      - name: customer

models:
  - name: customers_men
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: gender_cd
        description: 性別コード(0:男性)
        tests:
          - accepted_values:
              values: ['0']
      - name: gender
        description: 性別名
        tests:
          - accepted_values:
              values: ['男性']

airflowの設定

前回に引き続きこちらの記事を参考にさせていただき、Great ExpectationsのDAGの処理は記述した。

Airflowのウェブサーバーを立ち上げたら、connectionを作成する。

DAGファイルは以下のように記述した。Airflowでdbtを扱うために、airflow-dbtというライブラリを使用した。

dag.py
from datetime import timedelta
import os

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from airflow import AirflowException

from airflow_dbt.operators.dbt_operator import (
    DbtRunOperator,
    DbtTestOperator
)
from airflow.utils.dates import days_ago


default_args = {
  'start_date': days_ago(0),
  'retries': 0,
}

def validate(**context):
    from great_expectations.data_context import DataContext

    conn = BaseHook.get_connection('sasakky_bigquery')
    connection_json = conn.extra_dejson
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = connection_json['extra__google_cloud_platform__key_path']

    data_context: DataContext = DataContext(context_root_dir="/usr/app/great_expectations")

    result = data_context.run_checkpoint(
        checkpoint_name="checkpoint_customer",
        batch_request=None,
        run_name=None,
    )

    data_context.build_data_docs()

    if not result["success"]:
        raise AirflowException("Validation of the data is not successful ")

with DAG(dag_id='etl', default_args=default_args, schedule_interval='@daily') as dag:
  ge_check = PythonOperator(
        task_id='validate',
        python_callable=validate,
        provide_context=True,
    )
  
  dbt_test = DbtTestOperator(
    task_id='dbt_test',
    retries=0,
    profiles_dir='/usr/app/dbt',
    dbt_bin='/usr/local/bin/dbt',
    dir='/usr/app/dbt'
  )

  dbt_run = DbtRunOperator(
    task_id='dbt_run',
    profiles_dir='/usr/app/dbt',
    dbt_bin='/usr/local/bin/dbt',
    dir='/usr/app/dbt'
  )

  
  ge_check >> dbt_run >> dbt_test 

実行

AirflowからDAGをトリガーして実行する。

処理が最後まで成功したので、GCSを確認してアップロードしたData Docsを確認してみる。

次に、Great Expectationsにバリデーションが失敗した場合の挙動を確かめる。
birth_dayカラムをnullにして、再度DAGを実行してみる。

バリデーションが失敗したため、dbtのタスクは実行されない。
Data Docsを見ても、failedになっている。

考察

Great Expectationsとdbtでデータパイプラインのテストを実行することができた。
本番運用時にはAirflowでスケジューリングしながら、定期的に最新版のバリデーション結果を取得し、データの異常検知ができそうだ。

dbtでもデータソースに対するテストを実行できると思うので、使い分けをどうするかについては悩ましいところ。
動画内でも言われているように、実際のデータのテスト(今回で言うとNullチェックや主キーのチェック、値が範囲内に含まれているかのチェックなど)はGreat Expectationsで行い、それが担保された上でデータの変換ロジックのテスト(今回で言うと、customers_menに男性データのみを正しく抽出できているか)をdbtで実行するようにすべきか。

お互いのツールの得意不得意もありそうなので、dbt側のテストについてはもう少し深ぼって分析したい。
あとはGreat ExpectationsでBigQueryにtmpのテーブルが作成されてしまう謎があるので、その回避方法を調べたい。