AirflowでGCSにあるCSVデータをBigQueryのテーブルにロードする
gcs.to_bq.GoogleCloudStorageToBigQueryOperatorを使うと簡単。
import airflow
from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from datetime import timedelta
import pendulum
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"execution_timeout": timedelta(minutes=30),
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
dag = DAG(
"test_dag",
default_args=default_args,
catchup=True,
schedule_interval="0 0 * * *"
)
target_date = pendulum.now('Asia/Tokyo')
gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='test_task',
bucket='your-bucket',
destination_project_dataset_table='table_name${}'.format(
target_date.strftime('%Y%m%d')
),
source_objects=['some_dir/{}/*.gz'.format(target_date.to_date_string())],
source_format='NEWLINE_DELIMITED_JSON',
compression='GZIP',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
参考
Author And Source
この問題について(AirflowでGCSにあるCSVデータをBigQueryのテーブルにロードする), 我々は、より多くの情報をここで見つけました https://qiita.com/munaita_/items/79d61eceec75a806e35e著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .