AirflowでGCSにあるCSVデータをBigQueryのテーブルにロードする


gcs.to_bq.GoogleCloudStorageToBigQueryOperatorを使うと簡単。

import airflow
from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from datetime import timedelta
import pendulum

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "execution_timeout": timedelta(minutes=30),
    "retries": 3,
    "retry_delay": timedelta(minutes=5)
}

dag = DAG(
    "test_dag",
    default_args=default_args,
    catchup=True,
    schedule_interval="0 0 * * *"
)

target_date = pendulum.now('Asia/Tokyo')

gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='test_task',
    bucket='your-bucket',
    destination_project_dataset_table='table_name${}'.format(
        target_date.strftime('%Y%m%d')
    ),
    source_objects=['some_dir/{}/*.gz'.format(target_date.to_date_string())],
    source_format='NEWLINE_DELIMITED_JSON',
    compression='GZIP',
    write_disposition='WRITE_TRUNCATE',
    dag=dag
)

参考

How to Google Cloud Storage Operators