BigQueryのテーブルをPythonクライアントから操作する
事前準備
1.クレデンシャルの取得
あらかじめBigQueryのAPIを利用するためのサービスアカウントを作成し、クレデンシャル(JSON)をダウンロードしておきます。
https://cloud.google.com/docs/authentication/getting-started?hl=ja
2.必要なパッケージのインストール
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
テーブルの再作成(Drop⇒Create)
1.クライアントの初期化
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
[PATH TO CREDENTIAL]
にはクレデンシャルのJSONのパスを指定。
2.テーブルの再生成
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)
# テーブルの作成
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
[DATASET NAME]
と[TABLE NAME]
には作成先のデータセット名とテーブル名を指定します。
既存テーブルを削除する際にnot_found_ok
フラグがFalse
だとテーブルが存在しない場合にエラーになるのでTrue
にしておきます。DDLでいうとDROP TABLE IF EXISTS
みたいなイメージ。
テーブルの作成時にはスキーマ定義としてカラム名と型を指定する必要があります。デフォルトではNullableなカラムになるのでNot Nullなカラムにする場合はモードを指定して
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
とします。
3.データのインポート
初期データとしてCSVのデータをインポートします。
あらかじめスキーマ定義の型に合わせたデータをCSVで用意しておきます。
id, name
1, hogehoge
2, fugafuga
# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
CSVのヘッダを読み飛ばす場合はskip_leading_rows
でスキップ行数を指定すればOK。
Viewの再生成
migrate_view.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
ビューの場合も実体テーブルとほぼ流れは同じ。
違う点は、ビューの場合はview_query
にSQLのクエリを直接指定する点。
スキーマ定義はSQLのクエリから自動的に構成されるため指定する必要はありません。
スケジュールの更新
Scheduled Queryを利用して定期的にデータの洗い替えを行うようなシーンで、実行クエリやスケジューリングの設定を変更したい場合。
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
スケジュールの設定内容(config
)とアップデートマスク(update_mask
)をupdate_transfer_config
に渡してあげます。
update_mask
はconfig
の中でどのフィールドを更新するかを指定するものです。update_mask
に含まれない設定値は更新の対象外となります。
config
の設定値の詳細は以下の通り。
name
スケジュールのリソース名を指定します。
リソース名はBigQueryのコンソールの「スケジュールされたクエリ」からスケジュールの一覧が表示されるので、該当のスケジュールの構成情報から確認できます。
destination_dataset_id
スケジュールで実行したクエリ結果の保存先データセット名を指定します。
display_name
スケジュールの表示名なので任意な名前を付けます。
params.query
実行するクエリを指定します。
params.destination_table_name_template
スケジュールで実行したクエリ結果の保存先テーブル名を指定します。
params.write_disposition
テーブルへの保存方法を指定します。
WRITE_TRUNCATE
を指定した場合、既存のテーブルは実行クエリの結果で置換されます。
WRITE_APPEND
を指定した場合、既存のテーブルに対して実行クエリの結果が追加されます。
schedule
スケジュールの実行タイミングを指定します。
ex.
1時間ごとに実行・・・every 1 hour
毎日0時に実行・・・every day 00:00
おわりに
BigQueryをデータマート的に使う場合、コンソールからテーブルを作りまくると変更管理がしにくいのでコードに落としておくとgitで管理できてオススメです。
Author And Source
この問題について(BigQueryのテーブルをPythonクライアントから操作する), 我々は、より多くの情報をここで見つけました https://qiita.com/tikamoto/items/c269adcc9a3332484686著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .