PythonでBigQueryを操作する方法


PythonでBigQueryを扱う方法の詳細は下記の公式サイトを確認してください。
https://googleapis.dev/python/bigquery/latest/usage/index.html

データセットを操作したい場合は上記サイトのManaging Datasets
テーブル操作したい場合はManaging Tables
に記載があります。

テーブルの作成

テーブル作成後にすぐにデータを入れようとすると実行されないことがあるので注意。

テーブルの作成
from google.cloud import bigquery

# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"

schema = [
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]

# table_id = "your-project.your_dataset.your_table_name"
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.

print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

テーブル作成時のスキーマ指定方法

ステートメントの詳細についてはこちらに記載あり、スキーマの指定方法についてはBigQueryの公式サイトに詳細が書かれているので、
必要に応じてfield_typeやmodeを変更していきます。

ステートメント
SchemaField(name, field_type, mode='', description=None, fields=(), policy_tags=None)
スキーマ指定の一例
# 文字列(空白あり)
SchemaField(name, 'STRING', mode='NULLABLE')
# 整数 (空白あり)
SchemaField(name, 'INT64', mode='NULLABLE')
# 浮動小数点(空白あり)
SchemaField(name, 'FLOAT64', mode='NULLABLE')
# 日付(必須)
SchemaField(name, 'DATE', mode='REQUIRED')
# 日時(必須)
SchemaField(name, 'DATETIME', mode='REQUIRED')

queryを実行する方法

公式サイトはこちら
https://googleapis.dev/python/bigquery/latest/usage/queries.html

queryの実行
from google.cloud import bigquery

# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"

# 実行したいqueryを記載
query = '''
select * from `tableID`
where ...
'''

result = client.query(query)

# データフレームにする場合
df = result.to_dataframe()

# for文でも取り出し可能
for row in result:
  # 処理したい内容

insertする方法

一度にinsertできる行数は10,000までなので注意してください。
10,000行より多いデータを入れたい場合は後述の方法で対応可能です。

from google.cloud import bigquery

# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"

# table_id = "your-project.your_dataset.your_table_name"
table = client.get_table(table_id)  # Make an API request.

# リストの二次元配列
# 例ではタプルになっているがリストでも問題なし(下記はカラムが2つの場合)
# ただしスキーマの数と中のタプル(or リスト)の要素の数が異なるとエラーになるので注意
rows_to_insert = [("string", num), ("string", num)]

errors = client.insert_rows(table, rows_to_insert)  # Make an API request.

# pandasのdataflameを使う場合は
errors = client.insert_rows(table, df.values.tolist())

if errors == []:
    print("New rows have been added.")

dfにはdataflameが入っている想定で下記でリストの二次元配列へ変更が可能

df.values.tolist()

また、10,000行より大きいデータをinsertしたい場合は下記のように分割してあげてあげましょう。

rows = len(df)
count = 0
while rows > 0:
  client.insert_rows(table, df[(count * 10000): ((count+1) * 10000)].values.tolist())
  rows = rows - 10000
  count = count + 1

もっと良い書き方があるかもしれませんが、上記で10000より大きくても全て追加することが可能です。