CloudComposer(Airflow2.0) で GCS から BigQuery に定期的にロードするときに気をつけること


背景

GCSからBigQueryに定期的にデータをロードしたい。CloudComposerでロードするためのOperatorは以下の2つがある。

1. GCSToBigQueryOperator

worker上でGCSからBigQueryへのLoad処理を実行する。
実装が比較的シンプル。後続処理の同期が楽。

2. BigQueryCreateDataTransferOperator

BigQueryTransferServiceをAPI経由で作成して、GCSからBigQueryにデータをロードする。
ロード完了処理に同期して後続処理を行いたい場合は、PubSub経由の通知を利用する必要があり、少し面倒。(BigQuery Data Transfer Service の実行通知)

使い分け

基本的にGCSToBigQueryOperator を使えばいいと思う。
どちらもworker上でのDLや処理を行わないので処理は充分早く、workerへの負荷は少ない。
実装のシンプルさや、後続処理との同期を考えると、GoogleCloudStorageToBigQueryOperator を使ったほうが恩恵が大きい。

実装

GoogleCloudStorageToBigQueryOperator の実装

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator


default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'project_id': 'YOUR_PROJECT_ID',
    'retries': 2
}

with DAG(
    dag_id='gcs_to_bq',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
) as dag:


    GCSToBigQueryOperator(
        task_id='gcs_to_gq',
        bucket='bucket_name',
        source_objects=['path/to/dir/*'],
        schema_fields=[{'name': 'log', 'type': 'STRING', 'mode': 'NULLABLE'}],
        destination_project_dataset_table='dataset.table_name',
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition='WRITE_TRUNCATE',
        create_disposition='CREATE_IF_NEEDED',
        autodetect=False,
        ignore_unknown_values=True,
    )

BigQueryCreateDataTransferOperator の実装

以下が参考になる