CloudComposer(Airflow2.0) で GCS から BigQuery に定期的にロードするときに気をつけること
背景
GCSからBigQueryに定期的にデータをロードしたい。CloudComposerでロードするためのOperatorは以下の2つがある。
1. GCSToBigQueryOperator
worker上でGCSからBigQueryへのLoad処理を実行する。
実装が比較的シンプル。後続処理の同期が楽。
2. BigQueryCreateDataTransferOperator
BigQueryTransferServiceをAPI経由で作成して、GCSからBigQueryにデータをロードする。
ロード完了処理に同期して後続処理を行いたい場合は、PubSub経由の通知を利用する必要があり、少し面倒。(BigQuery Data Transfer Service の実行通知)
使い分け
基本的にGCSToBigQueryOperator を使えばいいと思う。
どちらもworker上でのDLや処理を行わないので処理は充分早く、workerへの負荷は少ない。
実装のシンプルさや、後続処理との同期を考えると、GoogleCloudStorageToBigQueryOperator を使ったほうが恩恵が大きい。
実装
GoogleCloudStorageToBigQueryOperator の実装
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': 'YOUR_PROJECT_ID',
'retries': 2
}
with DAG(
dag_id='gcs_to_bq',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
GCSToBigQueryOperator(
task_id='gcs_to_gq',
bucket='bucket_name',
source_objects=['path/to/dir/*'],
schema_fields=[{'name': 'log', 'type': 'STRING', 'mode': 'NULLABLE'}],
destination_project_dataset_table='dataset.table_name',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
autodetect=False,
ignore_unknown_values=True,
)
BigQueryCreateDataTransferOperator の実装
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': 'YOUR_PROJECT_ID',
'retries': 2
}
with DAG(
dag_id='gcs_to_bq',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
GCSToBigQueryOperator(
task_id='gcs_to_gq',
bucket='bucket_name',
source_objects=['path/to/dir/*'],
schema_fields=[{'name': 'log', 'type': 'STRING', 'mode': 'NULLABLE'}],
destination_project_dataset_table='dataset.table_name',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
autodetect=False,
ignore_unknown_values=True,
)
以下が参考になる
Author And Source
この問題について(CloudComposer(Airflow2.0) で GCS から BigQuery に定期的にロードするときに気をつけること), 我々は、より多くの情報をここで見つけました https://qiita.com/munaita_/items/4abdf5385ea9d702c89e著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .