BigQueryのテーブルをPythonクライアントから操作する


事前準備

1.クレデンシャルの取得

あらかじめBigQueryのAPIを利用するためのサービスアカウントを作成し、クレデンシャル(JSON)をダウンロードしておきます。
https://cloud.google.com/docs/authentication/getting-started?hl=ja

2.必要なパッケージのインストール

Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"

[requires]
python_version = "3.8"

テーブルの再作成(Drop⇒Create)

1.クライアントの初期化

migrate_table.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

[PATH TO CREDENTIAL]にはクレデンシャルのJSONのパスを指定。

2.テーブルの再生成

migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)

# テーブルの作成
schema = [
    bigquery.SchemaField('id', 'INT64'),
    bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))

[DATASET NAME][TABLE NAME]には作成先のデータセット名とテーブル名を指定します。

既存テーブルを削除する際にnot_found_okフラグがFalseだとテーブルが存在しない場合にエラーになるのでTrueにしておきます。DDLでいうとDROP TABLE IF EXISTSみたいなイメージ。

テーブルの作成時にはスキーマ定義としてカラム名と型を指定する必要があります。デフォルトではNullableなカラムになるのでNot Nullなカラムにする場合はモードを指定して

bigquery.SchemaField('id', 'INT64', mode='REQUIRED')

とします。

3.データのインポート

初期データとしてCSVのデータをインポートします。
あらかじめスキーマ定義の型に合わせたデータをCSVで用意しておきます。

import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
    job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()

CSVのヘッダを読み飛ばす場合はskip_leading_rowsでスキップ行数を指定すればOK。

Viewの再生成

migrate_view.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)

ビューの場合も実体テーブルとほぼ流れは同じ。
違う点は、ビューの場合はview_queryにSQLのクエリを直接指定する点。
スキーマ定義はSQLのクエリから自動的に構成されるため指定する必要はありません。

スケジュールの更新

Scheduled Queryを利用して定期的にデータの洗い替えを行うようなシーンで、実行クエリやスケジューリングの設定を変更したい場合。

migrate_schedule.py

import json

from google.cloud import bigquery
from google.oauth2 import service_account

from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery_datatransfer_v1.DataTransferServiceClient(
    credentials=credentials
)

config = google.protobuf.json_format.ParseDict(
    {
        "name": '[RESOURCE NAME]',
        "destination_dataset_id": '[DATASET NAME]',
        "display_name": '[DISPLAY NAME]',
        "data_source_id": "scheduled_query",
        "params": {
            "query": "SELECT * FROM dataset_name.table_name",
            "destination_table_name_template": '[TABLE NAME]',
            "write_disposition": "WRITE_TRUNCATE",
            "partitioning_field": "",
        },
        "schedule": "every 24 hours",
    },
    bigquery_datatransfer_v1.types.TransferConfig(),
)

update_mask = {"paths": ["display_name", "params", "schedule"]}

response = client.update_transfer_config(
    config, update_mask 
) 

スケジュールの設定内容(config)とアップデートマスク(update_mask)をupdate_transfer_configに渡してあげます。
update_maskconfigの中でどのフィールドを更新するかを指定するものです。update_maskに含まれない設定値は更新の対象外となります。

configの設定値の詳細は以下の通り。

name

スケジュールのリソース名を指定します。
リソース名はBigQueryのコンソールの「スケジュールされたクエリ」からスケジュールの一覧が表示されるので、該当のスケジュールの構成情報から確認できます。

destination_dataset_id

スケジュールで実行したクエリ結果の保存先データセット名を指定します。

display_name

スケジュールの表示名なので任意な名前を付けます。

params.query

実行するクエリを指定します。

params.destination_table_name_template

スケジュールで実行したクエリ結果の保存先テーブル名を指定します。

params.write_disposition

テーブルへの保存方法を指定します。
WRITE_TRUNCATEを指定した場合、既存のテーブルは実行クエリの結果で置換されます。
WRITE_APPENDを指定した場合、既存のテーブルに対して実行クエリの結果が追加されます。

schedule

スケジュールの実行タイミングを指定します。
ex.
1時間ごとに実行・・・every 1 hour
毎日0時に実行・・・every day 00:00

おわりに

BigQueryをデータマート的に使う場合、コンソールからテーブルを作りまくると変更管理がしにくいのでコードに落としておくとgitで管理できてオススメです。