Google Cloud Platform Cloud Composer の公式クイックスタートをやってみた


Google Cloud Composer ?

Google Cloud Composer は、Apache Airflow をGoogle Cloud Platform (GCP) のマネージドサービス化したもので、パイプラインの作成・スケジュール実行・監視ができます。バッチ処理のジョブの実行・監視ができるイメージのサービスです。
Apache Airflow の公式ページ には、

「Airflowは、プログラムでワークフローを作成、スケジュール設定、監視するためのプラットフォームです。」

と書かれています。
(Apache Airflow とは、Airbnb が云々・・・というのは有名のようなので省略します。)

GCP 上でバッチ処理というと、私は GCE インスタンスで実行する必要があると考えていましたが、マネージドサービスでバッチ処理ができるとは、

今更ながら、これは使える・使いたい

と思ったので、試してみました。

私は Apache Airflow についてあまり知らず、ドキュメントだけでは Cloud Composer のイメージがつかなかったので、クイックスタートを実際に行い記事にしてみました。
詳細な説明記事ではありませんが、本記事を読まれる方に Cloud Composer のイメージが伝われば幸いです。

クイックスタートをやってみた

Cloud Composer のクイックスタートは、以下の処理の実行を試すことができます。

  1. Creates a Cloud Dataproc cluster.
  2. Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage.
  3. Deletes the cluster.
  1. Cloud Dataproc クラスターを作成する。
  2. クラスターで Apache Hadoop 文字数カウントジョブを実行し、その結果を Cloud Storage に出力します。
  3. Cloud Dataproc クラスターを削除する。

では、クイックスタートのページを上からやっていきます。

使用するAPIを有効にする

これは簡単なので省きますが、クイックスタートで使用するCloud Composer、Cloud Dataproc、Cloud Storage のAPIを有効にします。

Cloud Composer の環境を作成する

ここでいきなり Cloud Composer の環境を作成します。設定は基本的にデフォルトとしました。以下が設定画面のイメージです。

必要な個所を入力すると、「作成」ボタンが押せるようになりますので、クリックします。
Cloud Composer の環境作成はかなり時間が掛かりますので焦らずに待ちましょう。

文字数カウントの結果を格納するバケットを作成する

名前はなんでもいいので、バケットを作成します。今回は us-central1 にリージョナルバケット、バケット名 wordcount-result を作成しました。

変数を設定する

Cloud Composer に以下の3つの変数を設定します。

キー 本記事での設定
gcp_project クイックスタートを実行するプロジェクトID GCPのproject-id
gcs_bucket 先ほど作成したバケットパス gs://wordcount-result
gce_zone Cloud Composer を作成したリージョン us-central1

設定するコマンドは以下のコマンドです。
gcloud composer environments run {Cloud Composer の環境名} --location {リージョン} variables -- --set {キー} {値}

本記事での具体的なコマンドは以下となります。

gcp-project設定
gcloud composer environments run my-environment \
    --location us-central1 variables -- --set gcp_project my-project-id
gcs_bucket設定
gcloud composer environments run my-environment \
    --location us-central1 variables -- --set gcs_bucket gs://wordcount-result
gce_zone設定
gcloud composer environments run my-environment \
    --location us-central1 variables -- --set gce_zone us-central1

設定した値を確認するには、以下のコマンドを使用します。
gcloud composer environments run {Cloud Composer の環境名}
--location {リージョン} variables -- --get {キー}

ワークフローファイルを設定する

クイックスタートに記載のサンプルワークフロー quickstart.py をCloud Composer に設定します。
quickstart.py の内容については、上記引用したものと同じですが、公式ドキュメントのクイックスタートに記載されているソースの説明を抜粋しました。ここは公式ドキュメントのソースを見て確認するほうが分かり易いと思います。

  1. DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.
  1. DataprocClusterCreateOperator:Cloud Dataproc クラスターを作成します。
  2. DataProcHadoopOperator:Hadoop文字数カウントジョブを実行して結果を Cloud Storage バケットに書き込みます。
  3. DataprocClusterDeleteOperator:Compute Engine の料金がかからないように、Cloud Dataproc クラスターを削除します。

DAGを Cloud Storage にアップロードする

クイックスタートに記載されているサンプルワークフロー quickstart.py を Github から取得します。(取得方法は割愛します。公式ドキュメントからコピペでも取得可能です。)
ここでは、Cloud Shell を使用してアップロード方法を記載します。

  1. Cloud Shell を起動します。
  2. DAG用フォルダを作成します。(本記事では test-dag フォルダを作成しました。) mkdir test-dag/
  3. quickstart.py を Cloud Shell にアップロードし、先ほど作成した test-dag フォルダに移動します。Cloud Shell でのファイルアップロードは以下のように簡単にできますが、ファイルはホームディレクトリにアップロードされるので、作成したフォルダに移動します。

  4. 以下のコマンドを使用し、DAGを Cloud Composer にアップロードします。

gcloud composer environments storage dags import
--environment {Cloud Composer の環境名} --location {リージョン}
--source {DAGファイルパス}

本記事では以下のコマンドを実行しました。

DAG-Import
gcloud composer environments storage dags import 
  --environment my-environment  --location us-central1 
  --source test-dag/quickstart.py

Airflow UIを開く

このクイックスタートは、Quickstart.py を Cloud Composer に設定すると、実行するようになっていますので、Cloud Composer の環境の画面にある「Airflow ウェブサーバー」の下の「Airflow」をクリックし、実行状態を確認します。

Airflow UIが開きます。以下のような感じで表示されます。
正常に終了した場合は、赤枠のようにSUCCESS(左隅の〇)が点灯します。

また、DAGをクリックすると、Tree View を表示します。
正常に完了していると、赤枠のステータスがSUCCESSとなります。マウスを緑の印に合わせるとステータスがわかります。

Graph View をクリックすると、どこかで見たことがあるジョブネットのようなイメージで表示されます。

実行結果

クイックスタートの実行結果は、作成した wordcount-result バケットに保存されています。以下がバケット内のイメージです。

作成されたファイルの一部をgsutil catコマンドで見てみるとこんな感じで出力されていました。

終わりに

本記事では、Cloud Composer のクイックスタートをなぞってみただけですが、バッチ処理で結構使えるイメージが湧きませんでしたか? Python を使ってバッチ処理が書けて、J(ピー)1のジョブネットのように実行監視ができるマネージドサービスは活用できる場面がたくさんあると思います。
私は、例えば BigQuery のバッチ処理や、クイックスタートのように Cloud Dataproc (Spark / Hadoop ) のバッチ処理など、日々ビッグデータを使用したデータ分析を行う企業の夜間バッチ処理に使いたいと思いました。

機会があれば次は Cloud Composer で BigQuery の処理を実行する記事を書こうと思います。
(もう既に偉大なる先人の記事があると思いますが・・・)

本記事を読んでいただいた方に Cloud Composer を知っていただき、ご活用いただけるキッカケになれば嬉しいです。お読みいただきありがとうございました。

参考サイト

Google Cloud Composer QuickStart
Cloud Shell