Dataformを使ってみる(BigQuery)


Google Cloudが買収し話題になったDataformを使ってみる。
https://cloud.google.com/blog/ja/products/data-analytics/welcoming-dataform-to-bigquery

BigQueryにデータ持ってきてる前提で、BigQuery内部のテーブル、データ項目、クエリの依存関係等を管理できるようだ。

ファイルの記述は基本的にSQLXという形式で記述する(JSONとSQLを混ぜたような感じ)

前提

すでにGCPプロジェクトはある。

始めてみる

事前準備

Dataform用にBigQuery Adminロールのサービスアカウントを作り、JSONキーを発行する。
方法は割愛。

Dataformのアカウントを作る

dataformにアクセスして、START NOWをクリック。グーグルアカウントで始められる。

Dataformのプロジェクトを作る

Dataformのプロジェクト名を入力

CONNECTをクリック

GCPのプロジェクトIDを入力

BigQueryのデータセットのロケーションを選択し、事前に作ったサービスアカウントのJSONキーをアップする

TEST CONNECTION押してこんな感じで出ればOK

Dataformベストプラクティス

参考:https://docs.dataform.co/best-practices/start-your-dataform-project

フォルダ構成を整える

どうやら以下3つのフォルダを作ってファイル管理することがベストプラクティスのようである

  • Sources:(多分)テーブル情報とか正規化とか型とか色々クレンジングされてる状態にするためのファイル
  • Staging:(多分)なるべく汎用化したテーブルを作るためのファイル
  • Analytics(Reporting?):(多分)データマートを作るためのファイル

ref()を使う

SQLのテーブル名は直接でなく、ref()を使って書くのが推奨らしい。
ref()使うことによって、依存関係エラーをリアルタイムに検知できるようだ。

sample
config { type: "table" }

select * from ${ref("my_table")}

declareでソースデータを定義する

ここでソースデータ定義しておくと、ref()で使えるようになる。

sample
config {
  type: "declaration",
  database: "SNOWFLAKE_SAMPLE_DATA",
  schema: "TPCH_SF1",
  name: "CUSTOMER",
}

タグでスケジュール管理する

SQLXにタグを記述することで、スケジュール実行を管理できるようである。

まずは以下2つのタグで始めてみるのが良いらしい。

  • daily
  • hourly
sample
config { type: "table", tags: ["daily"] }

select * from ${ref("crm_data")}

必要に応じてカスタムスキーマを使う

デフォルトではdataform.jsonでスキーマが定義されているが、SQLXファイルのconfigブロックでオーバーライドできる。

sample
config {
  type: "view",
  schema: "staging",
  tags: ["staging", "daily"],
  description: "Cleaned version of the raw customer table."
}

select
  c_custkey as customer_key,
  c_name as full_name,
  ...

アサーションを使用したテスト

データの品質を確認できる。通知を設定している場合、アサーションエラーとなった場合に検知できる。
以下の3つを設定可能。

  • uniqueKey:一意確認
  • nonNull:Nullでないこと確認
  • rowConditions:カスタムの式。どこかの行でFalseになるとエラー
sample
config {
  type: "table",
  assertions: {
    nonNull: ["order_date", "order_key", "customer_key"],
    uniqueKey: ["order_key"],
    rowConditions: [
      "total_parts >= 0"
    ]
  }
}

select ...

データ内容を記述する

コード内にデータ内容(テーブル内容、カラム内容)を記述することで、コラボレーションが容易になる。他のツールへのエクスポートも可能。

sample
config {
  type: "table",
  description: "This table contains summary stats by date aggregated by country",
  columns: {
    order_date: "Date of the order",
    order_id: "ID of the order",
    customer_id: "ID of the customer in the CRM",
    order_status: "Status of the order, from Shopify",
    payment_status: "Status of payment, from Stripe",
    payment_method: "Credit card of ACH",
    item_count: "Number of items in that order",
    amount: "Amount charged for that order, in US dollars using a floating FX rate"
  }
}

select ...

include マクロを使用して計算式を再利用する

CASE文等で固定値を使ってしまってる場合とか、事前に定義しておけばなにか変更が合った際にこのファイルを修正するだけでよくなる。

sample1
module.exports = (country) => {
  return `
  case
    when ${country} in ('US', 'CA') then 'NA'
    when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when ${country} in ('AU') then ${country}
    else 'Other'
  end`;
}
sample2
config { type: "table"}

select
  country as country,
  ${country_group("country")} as country_group,
...

試しにフロー作ってみる

上記までのベストプラクティス情報を参考にしながら、以下のようなフローを作ってみる。
BQの天気パブリックデータを使う。SQLは以下の記事で使ってるものをベース。
https://qiita.com/yakamazu/items/5e29208536a1374a50cd
(サンプルが適切なのかわからないが、雰囲気は掴めるだろうということで。。)

  1. 対象テーブルを定義する⇒ definitions/source
  2. 条件、計算式を定義する ⇒ includes
  3. 対象年でデータ抽出 ⇒ definitions/staging
  4. 天気データを作成 ⇒ definitions/analytics

フォルダを作成する

definitionsの下にsources, staging, analyticsのフォルダを作る
includesは最初からある

対象テーブルを定義する

definitions/Sourcesに入力元となるBigQueryテーブルのdeclarationファイルを作る

ファイル名と出力タイプを選択する。

出来たファイルを以下の内容で修正する。

definitions/Sources/ghcnd_.sqlx
config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "ghcn_d",
  name: "ghcnd_*",
}

同様に以下ファイルも作る

definitions/Sources/ghcnd_stations.sqlx
config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "ghcn_d",
  name: "ghcnd_stations",
}

条件、計算式を定義する

includes/ に以下のファイルを作成する

固定値を指定するファイル

includes/constants.js
const start_year = "'2015'";
const end_year = "'2020'";
const japan = "'JA'";

module.exports = {
  start_year,
  end_year,
  japan,
};

計算式のファイル

降雨量や最高気温等を計算する用のロジックのファイルを作成する

includes/mapping.js
function element_value(element, kind, value){
  return `
  case ${element} when ${kind} then ${value} / 10 else 0 end
  `;
}

module.exports = { element_value };

対象年でデータ抽出

definitions/Stagingにテーブル作成する
テーブルは通常版と1日前版の2つ作る

definitions/Staging/ghcd_org.sqlx
config {
  type: "table",
  name: "ghcnd_org",
  description: "天候データ",
  columns:{
             id: "id",
             date: "日付",
             element: "データの種類",
             value: "値",
  }
}

select
  id,
  date,
  element,
  value,
from 
  ${ref("ghcnd_*")} 
where
  _TABLE_SUFFIX between ${constants.start_year} and ${constants.end_year}

右側にコンパイルされたSQLが出てくるので、BQのWEB UIに貼り付けて問題ないか確認できる

同様に1日前版も作る

definitions/Staging/ghcd_1day_ago.sqlx
config {
  type: "table",
  name: "ghcnd_1day_ago",
  description: "天候データ(1日前)",
  columns:{
             id: "id",
             date: "日付(1日前)",
             element: "データの種類",
             value: "値",
  }
}

select
  id,
  date_add(date, interval -1 day) as date,
  element,
  value,
from 
  ${ref("ghcnd_*")} 
where
  _TABLE_SUFFIX between ${constants.start_year} and ${constants.end_year}

天気データを作成

stagingに作られたテーブルをベースにdefinitions/Analyticsに天気データを作成する

definitions/Analytics/tenki.sqlx
config {
  type: "table",
  description: "天候データ(完成版)",
  columns:{
             date: "日付",
             name: "観測地点",
             latitude: "緯度",
             longitude: "経度",
             prcp: "降雨量",
             snow: "降雪量",
             snwd: "積雪量",
             tmax: "最高気温",
             tmin: "最低気温",
             tavg: "平均気温",
  }
}

select 
  ghcnd.date, 
  stations.name, 
  stations.latitude,
  stations.longitude,
  max(${mapping.element_value("ghcnd.element", "'PRCP'", "ghcnd_1day_ago.value")}) as prcp, --降雨量(ミリメートル)
  max(${mapping.element_value("ghcnd.element", "'SNOW'", "ghcnd_1day_ago.value")}) as snow, --降雪量(ミリメートル) 
  max(${mapping.element_value("ghcnd.element", "'SNWD'", "ghcnd_1day_ago.value")}) as snwd, --積雪量(ミリメートル)
  max(${mapping.element_value("ghcnd.element", "'TMAX'", "ghcnd.value")}) as tmax, --最高気温
  max(${mapping.element_value("ghcnd.element", "'TMIN'", "ghcnd.value")}) as tmin, --最低気温
  max(${mapping.element_value("ghcnd.element", "'TAVG'", "ghcnd.value")}) as tavg, --平均気温
from 
  ${ref("ghcnd_org")} as ghcnd
left outer join
  ${ref("ghcnd_1day_ago")} as ghcnd_1day_ago
 on
  ghcnd.id = ghcnd_1day_ago.id and
  ghcnd.date = ghcnd_1day_ago.date and
  ghcnd.element = ghcnd_1day_ago.element
inner join 
  ${ref("ghcnd_stations")} as stations
 on 
  ghcnd.id = stations.id and 
  substr(stations.id, 1, 2) = ${constants.japan} --日本だけに絞る
group by 
  1,2,3,4

最終的に作ったファイルは以下の感じとなった。

フローを見てみる

左上のメニューの「Dependency tree」から依存関係確認できる。
ref()を入力に使うだけで勝手に依存関係作ってくれるっぽい。


実行してみる

フローの実行

右上の「START NEW RUN」からフローを実行できる

実行は3タイプから選択できる

  • プロジェクト全体実行
  • タグを設定している場合は該当タグのプログラムのみ
  • 実行プログラムの指定

今回はプロジェクト全体実行する

実行状態の確認

フローを実行していると右上に「run in progress」が出てくる

クリックすると以下画面のように進行状態を確認できる

過去の実行ログを確認

左上のメニューの「Run logs」から確認可能

実行結果

実行結果はデフォルト(dataform.jsonにて定義されている)では、dataformデータセットに作成される

configにdescriptionやcolumnsを設定しておけば、BQに説明として反映してくれるようだ

感想

SQLも知らないうちにカオスなことになることが多いので、Dataformで管理するのは便利かもしれない。(githubとも連携できるらしい)
includeで計算ロジックも一元化できるのは使い勝手が良さそう。
Airflowからもキックできそうで、結局どのようにデータパイプラン管理するのが最適なのか悩ましい。。

参考

すごいぞ Dataform
DataformでBigQueryのデータ変換を試してみた
無料のDataformでBigQueryにおけるデータ加工のDXを改善して幸せになろう