S3からBigQuery への転送をBigqueryTransferService + Cloud Composer(Airflow2.0)でやってみた


背景

  • s3にあるJSONファイルを、BigQueryにいい感じにロードしたい
  • 外部データをBigQueryにマネージドでロードしてくれる BigqueryTranserServiceという便利サービスがあるのでそれを使いたい
  • CloudComposer(ver2.0)でジョブを管理したい

方針

実装

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryCreateDataTransferOperator, BigQueryDataTransferServiceStartTransferRunsOperator

schedule_options = {"disable_auto_scheduling": True}
GCP_DTS_BQ_DATASET = 'dataset_name'
GCP_DTS_BQ_TABLE = 'table_name'
BUCKET_URI = 's3://bucket/path/06/25/*'
PROJECT = 'YOUR_GCP_PROJECT'
KEY = "YOUR_AWS_KEY"
SECRET = "YOUR_AWS_SECRET"

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'project_id': PROJECT,
    'retries': 2
}

PARAMS = {
    "max_bad_records": "0",
    "data_path": BUCKET_URI,
    "destination_table_name_template": GCP_DTS_BQ_TABLE,
    "access_key_id": KEY,
    "secret_access_key": SECRET,
    "file_format": "JSON",
    "ignore_unkown_values": True,
}

TRANSFER_CONFIG = {
    "destination_dataset_id": GCP_DTS_BQ_DATASET,
    "display_name": "s3_to_bq_test",
    "data_source_id": "amazon_s3",
    "schedule_options": schedule_options,
    "params": PARAMS,
}

with DAG(
    dag_id='s3_to_bq_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
) as dag:

    gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
        transfer_config=TRANSFER_CONFIG,
        project_id=PROJECT,
        task_id="create_s3_to_bq",
    )

    transfer_config_id = (
        "{{ task_instance.xcom_pull('create_s3_to_bq', key='transfer_config_id') }}"
    )

    gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
        task_id="execute_s3_to_bq",
        transfer_config_id=transfer_config_id,
        requested_run_time={"seconds": int(time.time() + 60)},
    )

    gcp_bigquery_create_transfer >> gcp_bigquery_start_transfe

注意点

  • 権限でちょいちょい引っかかるの
  • BigQueryCreateDataTransferOperatorではジョブを作成するとタスク完了となるので、後続処理がある場合はジョブの完了を感知する必要がある. PubSubでできるらしいので今度触ってみる (Pub/Sub通知)

  • BQTransferServiceでは write_append のみをサポートしているので、write_truncateなジョブを行いたい場合は諦めて s3 -> gcs -> bqの流れにする