P Cloud Composer - GCSからBigQueryへデータをロード


GCP Cloud Composerを使いこなすまでの道のりからの続き
前回はGCPのドキュメンを参考にAirflow 及び Cloud Composer の概要をまとめましたが今回は実際にCloud Composer サービスを使ったサンプルアプリケーションを実装してみます。

システム構成

DAGフローチャート

今回はstartfrom_exsample-composer-gcs_import_bigqueryendの3つのタスクを作ります。

タスクリスト

タスク ID 説明
start タスク開始を示すダミータスク
from_exsample-composer-gcs_import_bigquery GCSからBigQueryへデータを流すタスク
end タスク終了を示すダミータスク

システム構成図

サンプルアプリケーション

サンプルアプリケーションはGithubリポジトリに上がっています。

準備

サービスアカウントにIAMロール追加

ユーザー名 ロール
minarai-cloud-composer roles/composer.worker
minarai-cloud-composer ストレージ オブジェクト閲覧者
minarai-composer-admin roles/composer.admin
minarai-composer-admin BigQuery 管理者

BigQueryテーブルを作成

  • minarai-composer-adminアカウントをアクティベートして下記を実行します
bq --location asia-northeast1 mk \
     --dataset \
     --description "exsample composer GCS to BQ" \
     ${PROJECT_ID}:exsample_composer_dataset

bq --location asia-northeast1  mk -t \
      --schema ./dags/schemas/exsample_composer_table.json \
      --time_partitioning_type DAY \
      --description "example composer GCS to BQ" \
      ${PROJECT_ID}:exsample_composer_dataset.exsample_composer_table

Cloud Storageバケットを作成する

  • BigQuery Schemaを置くバケットとBiqQueryへ流し込むデータを置くバケットを作成します

Cloud Composer環境構築または環境更新

  • Cloud Composer環境構築がまだの場合は下記コマンドのupdatecreateに読み替えてください
gcloud composer environments update ${CLOUD_ENV} \
      --location asia-northeast1 \
      --update-env-variables=PROJECT_ID=${PROJECT_ID},BQ_LOCATION=${LOCATION}

DAGを登録(アップロード)

Cloud Composer環境が作成されるとGCSにCloud Composer dagsバケットが作成されます。
GCSバケットへDAGをアップロードすることでAirflow dagsディレクトリへマウントされます。
gcloudコマンドからアップロードするには下記コマンドを実行します。

gcloud composer environments storage dags import \
        --environment ${CLOUD_ENV} \
        --location ${LOCATION} \
        --source ./dags/gcs_to_bq.py