S3からBigQuery への転送をBigqueryTransferService + Cloud Composer(Airflow2.0)でやってみた
8285 ワード
背景
- s3にあるJSONファイルを、BigQueryにいい感じにロードしたい
- 外部データをBigQueryにマネージドでロードしてくれる BigqueryTranserServiceという便利サービスがあるのでそれを使いたい
- CloudComposer(ver2.0)でジョブを管理したい
方針
-
BigQueryCreateDataTransferOperatorを使う
- 実装方針はこのリンクを を参考にする
- GCPドキュメントにBQTransferServiceでのS3転送のページがあり、pythonのサンプルはないが、Javaの実装(Amazon S3 データ転送の設定) はあり、こちらも参考が参考になった
実装
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryCreateDataTransferOperator, BigQueryDataTransferServiceStartTransferRunsOperator
schedule_options = {"disable_auto_scheduling": True}
GCP_DTS_BQ_DATASET = 'dataset_name'
GCP_DTS_BQ_TABLE = 'table_name'
BUCKET_URI = 's3://bucket/path/06/25/*'
PROJECT = 'YOUR_GCP_PROJECT'
KEY = "YOUR_AWS_KEY"
SECRET = "YOUR_AWS_SECRET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': PROJECT,
'retries': 2
}
PARAMS = {
"max_bad_records": "0",
"data_path": BUCKET_URI,
"destination_table_name_template": GCP_DTS_BQ_TABLE,
"access_key_id": KEY,
"secret_access_key": SECRET,
"file_format": "JSON",
"ignore_unkown_values": True,
}
TRANSFER_CONFIG = {
"destination_dataset_id": GCP_DTS_BQ_DATASET,
"display_name": "s3_to_bq_test",
"data_source_id": "amazon_s3",
"schedule_options": schedule_options,
"params": PARAMS,
}
with DAG(
dag_id='s3_to_bq_dag',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT,
task_id="create_s3_to_bq",
)
transfer_config_id = (
"{{ task_instance.xcom_pull('create_s3_to_bq', key='transfer_config_id') }}"
)
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="execute_s3_to_bq",
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
gcp_bigquery_create_transfer >> gcp_bigquery_start_transfe
注意点
- 権限でちょいちょい引っかかるの
BigQueryCreateDataTransferOperatorではジョブを作成するとタスク完了となるので、後続処理がある場合はジョブの完了を感知する必要がある. PubSubでできるらしいので今度触ってみる (Pub/Sub通知)
BQTransferServiceでは write_append のみをサポートしているので、write_truncateなジョブを行いたい場合は諦めて s3 -> gcs -> bqの流れにする
- BigQueryCreateDataTransferOperatorを使う
- 実装方針はこのリンクを を参考にする
- GCPドキュメントにBQTransferServiceでのS3転送のページがあり、pythonのサンプルはないが、Javaの実装(Amazon S3 データ転送の設定) はあり、こちらも参考が参考になった
実装
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryCreateDataTransferOperator, BigQueryDataTransferServiceStartTransferRunsOperator
schedule_options = {"disable_auto_scheduling": True}
GCP_DTS_BQ_DATASET = 'dataset_name'
GCP_DTS_BQ_TABLE = 'table_name'
BUCKET_URI = 's3://bucket/path/06/25/*'
PROJECT = 'YOUR_GCP_PROJECT'
KEY = "YOUR_AWS_KEY"
SECRET = "YOUR_AWS_SECRET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': PROJECT,
'retries': 2
}
PARAMS = {
"max_bad_records": "0",
"data_path": BUCKET_URI,
"destination_table_name_template": GCP_DTS_BQ_TABLE,
"access_key_id": KEY,
"secret_access_key": SECRET,
"file_format": "JSON",
"ignore_unkown_values": True,
}
TRANSFER_CONFIG = {
"destination_dataset_id": GCP_DTS_BQ_DATASET,
"display_name": "s3_to_bq_test",
"data_source_id": "amazon_s3",
"schedule_options": schedule_options,
"params": PARAMS,
}
with DAG(
dag_id='s3_to_bq_dag',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT,
task_id="create_s3_to_bq",
)
transfer_config_id = (
"{{ task_instance.xcom_pull('create_s3_to_bq', key='transfer_config_id') }}"
)
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="execute_s3_to_bq",
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
gcp_bigquery_create_transfer >> gcp_bigquery_start_transfe
注意点
- 権限でちょいちょい引っかかるの
BigQueryCreateDataTransferOperatorではジョブを作成するとタスク完了となるので、後続処理がある場合はジョブの完了を感知する必要がある. PubSubでできるらしいので今度触ってみる (Pub/Sub通知)
BQTransferServiceでは write_append のみをサポートしているので、write_truncateなジョブを行いたい場合は諦めて s3 -> gcs -> bqの流れにする
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryCreateDataTransferOperator, BigQueryDataTransferServiceStartTransferRunsOperator
schedule_options = {"disable_auto_scheduling": True}
GCP_DTS_BQ_DATASET = 'dataset_name'
GCP_DTS_BQ_TABLE = 'table_name'
BUCKET_URI = 's3://bucket/path/06/25/*'
PROJECT = 'YOUR_GCP_PROJECT'
KEY = "YOUR_AWS_KEY"
SECRET = "YOUR_AWS_SECRET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': PROJECT,
'retries': 2
}
PARAMS = {
"max_bad_records": "0",
"data_path": BUCKET_URI,
"destination_table_name_template": GCP_DTS_BQ_TABLE,
"access_key_id": KEY,
"secret_access_key": SECRET,
"file_format": "JSON",
"ignore_unkown_values": True,
}
TRANSFER_CONFIG = {
"destination_dataset_id": GCP_DTS_BQ_DATASET,
"display_name": "s3_to_bq_test",
"data_source_id": "amazon_s3",
"schedule_options": schedule_options,
"params": PARAMS,
}
with DAG(
dag_id='s3_to_bq_dag',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT,
task_id="create_s3_to_bq",
)
transfer_config_id = (
"{{ task_instance.xcom_pull('create_s3_to_bq', key='transfer_config_id') }}"
)
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="execute_s3_to_bq",
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
gcp_bigquery_create_transfer >> gcp_bigquery_start_transfe
- 権限でちょいちょい引っかかるの
BigQueryCreateDataTransferOperatorではジョブを作成するとタスク完了となるので、後続処理がある場合はジョブの完了を感知する必要がある. PubSubでできるらしいので今度触ってみる (Pub/Sub通知)
BQTransferServiceでは write_append のみをサポートしているので、write_truncateなジョブを行いたい場合は諦めて s3 -> gcs -> bqの流れにする
Author And Source
この問題について(S3からBigQuery への転送をBigqueryTransferService + Cloud Composer(Airflow2.0)でやってみた), 我々は、より多くの情報をここで見つけました https://qiita.com/munaita_/items/75c8c8b528d4ddcf5123著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .