CloudComposer(Airflow2.0) で S3 から GCS に定期的にデータ転送するときに気にすること
背景
CloudComposerで S3 -> GCS へのファイルの定期転送を行いたい。S3 -> GCS への転送が行えるOperatorは、S3ToGCSOperator と CloudDataTransferServiceCreateJobOperator の2種類ある。 2つのOperatorの使い分けと実装方法を書いていく。
使い分け
ざっくり言うと
- データ量が多い場合はCloudDataTransferServiceCreateJobOperator
- データ量が少なく、後続処理もあり、シンプルに実装したい場合はS3ToGCSOperator
S3ToGCSOperator
CloudComposerのworkerでS3からファイルをダウンロードし、GCPにアップロードしていく。
Worker上で処理を行うため、TransferJobを使った方法に比べると処理が遅くWorkerへの負荷も大きい。
タスクの完了が転送の完了を意味するので、後続処理などがシンプルに書ける。
CloudDataTransferServiceCreateJobOperator
GCPのStorageTransferServiceを利用して転送を行う。転送をGCPフルマネージドサービスで行ってくれるので転送処理が早く、workerへの負荷がない。また、高度な転送条件設定が可能。
しかし、タスクの完了は転送の完了を意味しないので、PubSubを使った完了通知による後続処理の同期が必要になる。
どのくらいスピード違うの?
3400ファイル計15GBのファイルを S3 -> GCS に転送した場合
- S3ToGCSOperator(n1-standart-1)では転送完了に1時間程度かかった
- CloudDataTransferServiceCreateJobOperatorでは転送完了は30秒しかからなかった
実装
S3ToGCSOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': 'YOUR_PROJECT',
'retries': 2
}
with DAG(
dag_id='s3_to_gcs',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
gcp_bigquery_create_transfer = S3ToGCSOperator(
bucket='s3_bucket',
prefix='path/',
aws_conn_id='your_aws_conn_id',
dest_gcs="gs://YOUR_GCS_BUCKET/path/",
task_id="s3_to_gcs",
)
注意
- Airflowにaws_conn_idの設定が必要 Airflow接続の準備
-
apache-airflow-providers-amazon の pip ライブラリの追加が必要
- 1.4.0を追加したら動いた
CloudDataTransferServiceCreateJobOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceCreateJobOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
ALREADY_EXISTING_IN_SINK,
AWS_S3_DATA_SOURCE,
BUCKET_NAME,
DESCRIPTION,
GCS_DATA_SINK,
JOB_NAME,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
OBJECT_CONDITIONS,
TRANSFER_OPTIONS,
TRANSFER_SPEC,
GcpTransferJobsStatus,
)
GCP_PROJECT_ID = 'YOUR_PROJECT'
AWS_BUCKET = "AWS_BUCKET"
GCP_BUCKET = "GCP_BUCKET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': GCP_PROJECT_ID,
'retries': 2
}
with DAG(
dag_id='s3_to_gcs_2',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
aws_to_gcs_transfer_body = {
DESCRIPTION: '説明',
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: 'transferJobs/12345678',
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_BUCKET},
OBJECT_CONDITIONS: {
'includePrefixes': ['path/']
},
TRANSFER_OPTIONS: {
ALREADY_EXISTING_IN_SINK: True
},
},
}
create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_from_aws",
aws_conn_id='your_conn_id',
body=aws_to_gcs_transfer_body
)
注意
- bodyの設定が厄介 REST Resource: transferJobsを参考に作っていく
- JOB_NAMEは
transferJobs/${任意の数字 or 文字列}
という規則がある
- ON_DEMANDなジョブスケジューリングをしたい場合は、SCHEDULE_START_DATE と SCHEDULE_END_DATE を同じ時刻にする
- bodyのkey名がわからない場合は ここ を参照する
- GCSの権限に legacyBucket系の権限が必要なので注意する データソースへのアクセスの設定
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': 'YOUR_PROJECT',
'retries': 2
}
with DAG(
dag_id='s3_to_gcs',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
gcp_bigquery_create_transfer = S3ToGCSOperator(
bucket='s3_bucket',
prefix='path/',
aws_conn_id='your_aws_conn_id',
dest_gcs="gs://YOUR_GCS_BUCKET/path/",
task_id="s3_to_gcs",
)
注意
- Airflowにaws_conn_idの設定が必要 Airflow接続の準備
-
apache-airflow-providers-amazon の pip ライブラリの追加が必要
- 1.4.0を追加したら動いた
CloudDataTransferServiceCreateJobOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceCreateJobOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
ALREADY_EXISTING_IN_SINK,
AWS_S3_DATA_SOURCE,
BUCKET_NAME,
DESCRIPTION,
GCS_DATA_SINK,
JOB_NAME,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
OBJECT_CONDITIONS,
TRANSFER_OPTIONS,
TRANSFER_SPEC,
GcpTransferJobsStatus,
)
GCP_PROJECT_ID = 'YOUR_PROJECT'
AWS_BUCKET = "AWS_BUCKET"
GCP_BUCKET = "GCP_BUCKET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': GCP_PROJECT_ID,
'retries': 2
}
with DAG(
dag_id='s3_to_gcs_2',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
aws_to_gcs_transfer_body = {
DESCRIPTION: '説明',
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: 'transferJobs/12345678',
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_BUCKET},
OBJECT_CONDITIONS: {
'includePrefixes': ['path/']
},
TRANSFER_OPTIONS: {
ALREADY_EXISTING_IN_SINK: True
},
},
}
create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_from_aws",
aws_conn_id='your_conn_id',
body=aws_to_gcs_transfer_body
)
注意
- bodyの設定が厄介 REST Resource: transferJobsを参考に作っていく
- JOB_NAMEは
transferJobs/${任意の数字 or 文字列}
という規則がある
- ON_DEMANDなジョブスケジューリングをしたい場合は、SCHEDULE_START_DATE と SCHEDULE_END_DATE を同じ時刻にする
- bodyのkey名がわからない場合は ここ を参照する
- GCSの権限に legacyBucket系の権限が必要なので注意する データソースへのアクセスの設定
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceCreateJobOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
ALREADY_EXISTING_IN_SINK,
AWS_S3_DATA_SOURCE,
BUCKET_NAME,
DESCRIPTION,
GCS_DATA_SINK,
JOB_NAME,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
OBJECT_CONDITIONS,
TRANSFER_OPTIONS,
TRANSFER_SPEC,
GcpTransferJobsStatus,
)
GCP_PROJECT_ID = 'YOUR_PROJECT'
AWS_BUCKET = "AWS_BUCKET"
GCP_BUCKET = "GCP_BUCKET"
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': GCP_PROJECT_ID,
'retries': 2
}
with DAG(
dag_id='s3_to_gcs_2',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
aws_to_gcs_transfer_body = {
DESCRIPTION: '説明',
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: 'transferJobs/12345678',
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_BUCKET},
OBJECT_CONDITIONS: {
'includePrefixes': ['path/']
},
TRANSFER_OPTIONS: {
ALREADY_EXISTING_IN_SINK: True
},
},
}
create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_from_aws",
aws_conn_id='your_conn_id',
body=aws_to_gcs_transfer_body
)
- JOB_NAMEは
transferJobs/${任意の数字 or 文字列}
という規則がある - ON_DEMANDなジョブスケジューリングをしたい場合は、SCHEDULE_START_DATE と SCHEDULE_END_DATE を同じ時刻にする
Author And Source
この問題について(CloudComposer(Airflow2.0) で S3 から GCS に定期的にデータ転送するときに気にすること), 我々は、より多くの情報をここで見つけました https://qiita.com/munaita_/items/d6ae71f52b14816e6346著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .