Google ColabでBigQueryを使う際によく使うコードスニペット


前提: 認証

from google.colab import auth
auth.authenticate_user()

ColabからBigQueryを利用するには、最初に認証を行う必要がある。

短いコードなのでわざわざ覚えるほどでもないが、コードスニペットが用意されているのでそれを使うと素早く追加できる。

クエリーの結果をDataFrameに格納する

google.cloud.bigqueryMagicコマンドを使うと、一発でクエリーの結果をPandasのDataFrameとして取得できる。

%%bigquery df --project myproject
SELECT * FROM `myproject.foo.logs`

Magicコマンドの引数で指定した名前でDataFrameが作られ、Python側から参照できるようになっている。

df.head(5)

ただ内容を確認したい時

ただ内容を確認したい時は、格納先のDataFrameを未指定にする。

%%bigquery --project myproject
SELECT * FROM `myproject.foo.logs`

また、その際に%load_ext google.colab.data_tableを実行しておくと、出力結果に対してフィルタリング・ページングができるようになって便利。

パラメータを渡す

Parameterized Queryの機能をつかって、クエリーにパラメータを渡すことができる。

%%bigquery --project myproject df --params {"user_id": 123}
SELECT * FROM `myproject.foo.logs` WHERE user_id = @user_id

パラメータは--params引数に辞書型で渡す。渡されたパラメータは@パラメータ名で参照できる。テーブル名の箇所にはパラメータは使えないので注意。また、paramsはコマンドの最後に指定しないとエラーになるため注意

パラメータには配列を渡すこともできる。

%%bigquery --project myproject df --params {"user_id": [1, 2, 3]}
SELECT * FROM `myproject.foo.logs` WHERE user_id in UNNEST(@user_id)

変数を渡したい場合

params = {"user_id": [1, 2, 3]}
%%bigquery --project myproject df --params $params
SELECT * FROM `myproject.foo.logs` WHERE user_id in UNNEST(@user_id)

結果をCSVとして保存する

import os
df.to_csv(os.path.join("output.csv"), index=False)

取得したDataFrameはto_csvでCSVファイルに書き出せる。Google Driveをマウントしておけば、Google Drive上に書き出すことができて便利。

from pathlib import Path
Path(out_dir).mkdir(parents=True, exist_ok=True)

出力先のディレクトリを用意したい時はpathlibPath.mkdirが便利。parents=Trueをセットすることで、中間ディレクトリもまとめて作ってくれたり、exist_ok=Trueをセットすることで、すでにディレクトリがある場合でもエラーにならず、処理を続行できる。

クエリーの結果からテーブルを作る

分析の内容によっては、中間テーブルを作ることで計算量を節約できたり、中間結果の可視化/チェックをできて効率を改善できる。

最新のgoogle-cloud-bigqueryでは、--destination_tableを指定することで、クエリーの実行結果からそのままテーブルを作ることができるが、2020年6月現在Google Colabでデフォルトで入っているgoogle-cloud-bigqueryのバージョンではこのオプションが利用できないため、Magicコマンドを使わずにジョブを作成するコードを書く。

from google.cloud import bigquery

client = bigquery.Client(project="myproject")
table_id = "myproject.foo.purchase_logs"

job_config = bigquery.QueryJobConfig(destination=table_id)

sql = r'''
    SELECT transaction_id, user_id, transaction_date, product_id
    FROM `myproject.foo.transactions`
    WHERE action = "purchase"
'''

query_job = client.query(sql, job_config=job_config)
query_job.result()

公式ドキュメント: https://cloud.google.com/bigquery/docs/writing-results?hl=ja

なお、クエリー内でDECLAREなどScriptingを使用している場合は、BigQueryの制約上実行結果をテーブルに保存できないので注意。
https://cloud.google.com/bigquery/docs/reference/standard-sql/scripting
https://stackoverflow.com/questions/58646725/cannot-set-destination-table-with-bigquery-python-api

参考: Notebookで使えるgoogle-cloud-bigqueryパッケージのバージョンの確認方法

!pip show google-cloud-bigquery

バージョン1.22.0以降であれば、--destination_tableを使用できる。

テーブルが存在する時に上書きする

QueryJobConfigWriteDispositionWRITE_TRUNCATEを指定すると、すでにテーブルが存在した場合に内容を破棄した上で作り直すことができる。(元のテーブルのデータは破棄されるため注意)

job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)

WRITE_APPENDを指定した場合は既存のテーブルにさらにデータが追加される。

CSVファイルの中身からテーブルを作る

CSVファイルからテーブルを作りたい場合は、bqコマンドのbg loadを使ってアップロードする。

!bq load --project_id=myproject --autodetect --source_format=CSV myproject:foo.products sample.csv

指定するパラメータの内容は、読み込むCSVの内容によって適宜変更する。

--replaceを指定すると、すでにテーブルがあった場合にその内容を破棄した上で、作り直すことができる。

その他

クエリー生成用のメソッドをheredocで書く

def build_some_query(user_id, product_id):
  return f'''
SELECT time, user_id, product_id, price FROM some_table WHERE user_id = {user_id} AND product_id = {product_id}
'''

参考