BigQuery上のテーブル・ビュー及びスキーマ更新をコード管理する


BigQuery上のテーブルスキーマ更新は比較的簡単にできるのですが、ビューのスキーマ更新で少々躓きました。
いろいろと調べて、sqlとpython、ymlファイルで実装した内容をまとめています。

前置きが少々長いですが、興味がある方の参考になれば幸いです。

概要・前置き

以前しんゆうさんが「データ整備人(仮)」と呼んでいましたが、私自身社内で近しい業務を行っています。
https://analytics-and-intelligence.net/archives/5826

  • 社内のデータ抽出時のネックとして、テーブル構造が複雑で学習コストが高い
    例えば何か商品を購入した、というデータを出すのにテーブルをいくつも結合させる必要があったり。。

そこで、テンプレとなっている処理は、viewとしてまとめていくことになりました。

SQLに詳しい方だと処理スピードを考えて、テーブル化してバッチ処理すればいいのでは?と思われるかもしれません。
viewにした理由として、データの持たせ方が変わるかも、ということを想定していました。

この形で当分問題なさそう、という段階になればテーブル化は検討したいと思っています。

  • 今後はデータカタログを活用していきたい
    BQ上に存在するテーブル名や説明欄、スキーマ名やスキーマの説明欄を検索することができます。

データ抽出時のコスト要因の1つとして、テーブル定義書の確認があります。
BQ上にテーブルやスキーマの説明が記載されていれば、スイッチングコストも減り非常に楽になりそうです。

テーブル内のスキーマ更新は非常に簡単

CREATE文を利用時にオプションパラメータを指定することで、SQL文にてスキーマの説明内容を渡すことができます。
【BigQuery】CREATE文を使えるの知ってた?SQLでテーブル作成

似たような感じで、CREATE VIEWをすれば行けるのかなと最初は思っていたのですが、見事にエラーになりました。

そこで下記記事を参考に、pythonを利用してviewの作成及びスキーマを更新するコードを実装してみました。
[BigQuery] BigQueryのPython用APIの使い方 -テーブル作成編-

開発環境

  • windows10
  • python 3.8.2

事前の準備

  • スキーマ更新を行うため、サービスアカウントを発行し必要な権限を付与
    BigQueryのデータ編集者、閲覧者、ユーザー権限を付与しています。

  • モジュールのinstallが完了していない場合、下記を参照ください
    クイックスタート: クライアント ライブラリの使用

ファイル構成


┣ create_view.py
┣ upadte_schema.py
┣ key
┃ ┗ jsonキーファイル
┣ description
┃ ┣ import_yml.py
┃ ┗ schema_sample.yml
┗ sql
  ┗ create_view.sql

実装コード

viewなのかtableなのか迷走している部分があるのですが、よしなにリファクタしていただければ幸いです。

ファイル構成に記載した通り、スキーマの記載をymlファイル内で行っています。

参考にしたサイトと同様、スキーマ更新ファイルはpythonファイル内に指定しても良かったです。
将来的には、各サービス担当のエンジニアの方にスキーマの説明を更新してもらうことなどを見据え、
平易に修正できるymlファイルを採用しています。

create_view.sql
#StandardSQL
作成したいテーブルやビューの実行クエリを書いていただければ問題ないです

実行コマンドサンプル
python create_view.py #1.sql/実行したいSQLファイル #2.tableもしくはview #3.テーブルもしくはビュー名

create_view.py
from google.cloud import bigquery
from google.oauth2 import service_account
import sys
import codecs
# PC起動の度に環境変数を設定するのが手間なので、パスを指定して実行
credentials = service_account.Credentials.from_service_account_file(
    filename= './key/発行したサービスアカウントのJSONキーファイル',
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

args = sys.argv
# 読み込み用に開き、utfに変換しエラーの場合は文字型で渡す
file = codecs.open(args[1],  mode='r', encoding='utf-8', errors='strict') 
query = file.read()  

# TABLEはReplace可能だが、VIEWは不可能なので、既存のものがある場合は削除する処理を追加
drop_data ='DROP '+args[2]+' IF EXISTS `作成先のプロジェクト名.データセット名.'+args[3]+'`'+';'+'\n'

create_data ='CREATE '+args[2]+' IF NOT EXISTS'+'\n'+'`作成先のプロジェクト名.データセット名.'+args[3]+'`'+'\n' + 'AS' + '\n'+ query

def exec_query():
    client = bigquery.Client(
        credentials=credentials,
        project=credentials.project_id,
    )
    exec_job = client.query(
                drop_data + create_data
                )
    exec_job.result() 
if __name__ == '__main__':
    exec_query()
    file.close()
schema_sample.yml
name: テーブルもしくはビューの名称。view作成用のsqlと同じにした方が管理が楽だと思います
dataset: データセットの名称
desc: テーブルもしくはビューの説明を記載してください

desc: | 文章を途中で改行したい場合は、先頭に | を追加したこちらを利用
  https://www.task-notes.com/entry/20150922/1442890800
columns:
  - name: カラムの名称。
   クエリを書くときに使用するため、名前から分かりやすい名称が好ましいです。
    type: 型を記載してください。以下サンプルの対応表です。他の型を利用したい場合は下記を参照。
    https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_rules?hl=ja
    整数:INTEGER
    数値:FLOAT
    文字列:STRING
    日付:DATE
    時間:DATETIME
    description: カラムの中身に関する説明を記載してください。
    特定の条件だと、どんな数値が取れるのか?など記載いただけると良いと思います。
  - name: test
    type: STRING
    description: こんな感じで書けば大丈夫です
  • description配下のスキーマ説明ファイルを読み込みます。
import_yml.py
import yaml
import sys
import codecs

args = sys.argv
file_path = r'.\description'+'\\'+args[1]+'.yml'

with codecs.open(file_path, 'r', 'utf-8') as file:
    obj = yaml.safe_load(file)

schema_list = obj["columns"]

def schema():
    from google.cloud.bigquery import SchemaField
    res = []
    for i in range(len(schema_list)):
            input_schema = SchemaField(name=schema_list[i]['name'], field_type=schema_list[i]['type'], description=schema_list[i]['description'])
            res.append(input_schema)
    return res

実行コマンドサンプル
python upadte_schema.py #1.作成したymlファイル名

upadte_schema.py
from google.cloud import bigquery
from google.oauth2 import service_account
from description import import_yaml

# サービスアカウントのJSONキーファイルを利用する
# パスを通すのは手間なので、下記を利用する
credentials = service_account.Credentials.from_service_account_file(
    filename= './key/発行したサービスアカウントのJSONキーファイル',
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

project_id = "更新したいテーブル・ビューのProjectを指定"
client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

# 対象のテーブル・ビュー名
table_name = import_yaml.obj.get("name")
dataset_id = import_yaml.obj.get("dataset")
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)

# 以下、スキーマの説明更新
schema = import_yaml.schema()

view_schema = bigquery.Table(table_id, schema=schema)
client.update_table(view_schema, ["schema"])  # Make an API request.

# 以下、テーブル・ビュー本体の説明更新
table_ref = client.dataset(dataset_id).table(table_name)
view = client.get_table(table_ref) 
# 説明内容を読み込んで更新
view.description = import_yaml.obj.get("desc")
client.update_table(view, ["description"]) 

今後の展望

今は手動で更新を行っていますが、今後はCI/CDの仕組みも取り入れていきたいです。

スキーマ更新ですが、対象のテーブルやビューのカラムを全て記載する必要があります。
そのためテーブルの中身の更新後、スキーマ更新ファイルが実行される、というフローを構築していきたいです。

また何か分析を行う際に、このデータってどこにあるのか?この条件で抽出していいのか?という、サービス担当者への確認作業が発生したりします。
スキーマの充実によってデータカタログでの検索が容易になれば、分析者も自主的に作業ができ、お互いに本来やりたい業務に時間を割けるようになります。

社内で分析をしたいという方々が、必要な情報を、簡単に探せる環境づくりに繋がったらいいなと思いますし、
同じ悩みを抱えている方の参考になれば幸いです。