AirflowでGoogleCloudStorageToS3Operatorを使ってGCSのファイルをS3に送る


背景

GCSのcsvデータをS3にtransferしたい。
GCSからS3のトランスファーには GoogleCloudStorageToS3Operator を使用する。
S3のシークレット情報の管理は Airflow接続 で管理する。

Airflow接続の準備

S3へのシークレット情報はAirflow接続を使って管理する。
Airflowのコンソール画面からAirflow接続情報を登録する。
新しい Airflow 接続の作成

↓登録内容

Login: AWSのアクセスキー
Password: AWSのアクセスシークレットキー

サンプルDAG


import airflow
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "execution_timeout": timedelta(minutes=30),
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "transfer_gcs_to_s3",
    default_args=default_args,
    catchup=False,
    schedule_interval="0 0 * * *",
)

GoogleCloudStorageToS3Operator(
    task_id="transfer_gcs_to_s3",
    bucket='gcs_bucket_name',
    prefix="gcs/path/to/file.csv",
    dest_aws_conn_id='gcs_to_s3_sample', # Airflow接続に登録したid
    dest_s3_key="s3://s3-bucket-name/",
    replace=True,
    dag=dag
)

参考

使い方サンプルgit: [AIRFLOW-6147] [AIP-21] Rename GoogleCloudStorageToS3Operator

S3のシークレットのAirflow接続での扱い方: Airflow s3 connection using UI