CloudComposer(Airflow2.0) で S3 から GCS に定期的にデータ転送するときに気にすること


背景

CloudComposerで S3 -> GCS へのファイルの定期転送を行いたい。S3 -> GCS への転送が行えるOperatorは、S3ToGCSOperatorCloudDataTransferServiceCreateJobOperator の2種類ある。 2つのOperatorの使い分けと実装方法を書いていく。

使い分け

ざっくり言うと

  • データ量が多い場合はCloudDataTransferServiceCreateJobOperator
  • データ量が少なく、後続処理もあり、シンプルに実装したい場合はS3ToGCSOperator

S3ToGCSOperator

実装例

CloudComposerのworkerでS3からファイルをダウンロードし、GCPにアップロードしていく。
Worker上で処理を行うため、TransferJobを使った方法に比べると処理が遅くWorkerへの負荷も大きい。
タスクの完了が転送の完了を意味するので、後続処理などがシンプルに書ける。

CloudDataTransferServiceCreateJobOperator

実装ヒント
実装例

GCPのStorageTransferServiceを利用して転送を行う。転送をGCPフルマネージドサービスで行ってくれるので転送処理が早く、workerへの負荷がない。また、高度な転送条件設定が可能。
しかし、タスクの完了は転送の完了を意味しないので、PubSubを使った完了通知による後続処理の同期が必要になる。

どのくらいスピード違うの?

3400ファイル計15GBのファイルを S3 -> GCS に転送した場合

  • S3ToGCSOperator(n1-standart-1)では転送完了に1時間程度かかった
  • CloudDataTransferServiceCreateJobOperatorでは転送完了は30秒しかからなかった

実装

S3ToGCSOperator

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'project_id': 'YOUR_PROJECT',
    'retries': 2
}

with DAG(
    dag_id='s3_to_gcs',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:

    gcp_bigquery_create_transfer = S3ToGCSOperator(
        bucket='s3_bucket',
        prefix='path/',
        aws_conn_id='your_aws_conn_id',
        dest_gcs="gs://YOUR_GCS_BUCKET/path/",
        task_id="s3_to_gcs",
    )

注意

CloudDataTransferServiceCreateJobOperator

from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceCreateJobOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
    ALREADY_EXISTING_IN_SINK,
    AWS_S3_DATA_SOURCE,
    BUCKET_NAME,
    DESCRIPTION,
    GCS_DATA_SINK,
    JOB_NAME,
    PROJECT_ID,
    SCHEDULE,
    SCHEDULE_END_DATE,
    SCHEDULE_START_DATE,
    START_TIME_OF_DAY,
    STATUS,
    OBJECT_CONDITIONS,
    TRANSFER_OPTIONS,
    TRANSFER_SPEC,
    GcpTransferJobsStatus,
)

GCP_PROJECT_ID = 'YOUR_PROJECT'
AWS_BUCKET = "AWS_BUCKET"
GCP_BUCKET = "GCP_BUCKET"

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'project_id': GCP_PROJECT_ID,
    'retries': 2
}

with DAG(
    dag_id='s3_to_gcs_2',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:

    aws_to_gcs_transfer_body = {
        DESCRIPTION: '説明',
        STATUS: GcpTransferJobsStatus.ENABLED,
        PROJECT_ID: GCP_PROJECT_ID,
        JOB_NAME: 'transferJobs/12345678',
        SCHEDULE: {
            SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
            SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
            START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
        },
        TRANSFER_SPEC: {
            AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET},
            GCS_DATA_SINK: {BUCKET_NAME: GCP_BUCKET},
            OBJECT_CONDITIONS: {
                'includePrefixes': ['path/']
            },
            TRANSFER_OPTIONS: {
                ALREADY_EXISTING_IN_SINK: True
                },
        },
    }

    create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
        task_id="create_transfer_job_from_aws",
        aws_conn_id='your_conn_id',
        body=aws_to_gcs_transfer_body
    )

注意

  • bodyの設定が厄介 REST Resource: transferJobsを参考に作っていく
    • JOB_NAMEは transferJobs/${任意の数字 or 文字列} という規則がある
    • ON_DEMANDなジョブスケジューリングをしたい場合は、SCHEDULE_START_DATE と SCHEDULE_END_DATE を同じ時刻にする
  • bodyのkey名がわからない場合は ここ を参照する
  • GCSの権限に legacyBucket系の権限が必要なので注意する データソースへのアクセスの設定