Google Cloud Platform Cloud Composer の公式クイックスタートをやってみた
Google Cloud Composer ?
Google Cloud Composer は、Apache Airflow をGoogle Cloud Platform (GCP) のマネージドサービス化したもので、パイプラインの作成・スケジュール実行・監視ができます。バッチ処理のジョブの実行・監視ができるイメージのサービスです。
Apache Airflow の公式ページ には、
「Airflowは、プログラムでワークフローを作成、スケジュール設定、監視するためのプラットフォームです。」
と書かれています。
(Apache Airflow とは、Airbnb が云々・・・というのは有名のようなので省略します。)
GCP 上でバッチ処理というと、私は GCE インスタンスで実行する必要があると考えていましたが、マネージドサービスでバッチ処理ができるとは、
今更ながら、これは使える・使いたい
と思ったので、試してみました。
私は Apache Airflow についてあまり知らず、ドキュメントだけでは Cloud Composer のイメージがつかなかったので、クイックスタートを実際に行い記事にしてみました。
詳細な説明記事ではありませんが、本記事を読まれる方に Cloud Composer のイメージが伝われば幸いです。
クイックスタートをやってみた
Cloud Composer のクイックスタートは、以下の処理の実行を試すことができます。
- Creates a Cloud Dataproc cluster.
- Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage.
- Deletes the cluster.
- Cloud Dataproc クラスターを作成する。
- クラスターで Apache Hadoop 文字数カウントジョブを実行し、その結果を Cloud Storage に出力します。
- Cloud Dataproc クラスターを削除する。
では、クイックスタートのページを上からやっていきます。
使用するAPIを有効にする
これは簡単なので省きますが、クイックスタートで使用するCloud Composer、Cloud Dataproc、Cloud Storage のAPIを有効にします。
Cloud Composer の環境を作成する
ここでいきなり Cloud Composer の環境を作成します。設定は基本的にデフォルトとしました。以下が設定画面のイメージです。
必要な個所を入力すると、「作成」ボタンが押せるようになりますので、クリックします。
Cloud Composer の環境作成はかなり時間が掛かりますので焦らずに待ちましょう。
文字数カウントの結果を格納するバケットを作成する
名前はなんでもいいので、バケットを作成します。今回は us-central1 にリージョナルバケット、バケット名 wordcount-result を作成しました。
変数を設定する
Cloud Composer に以下の3つの変数を設定します。
キー | 値 | 本記事での設定 |
---|---|---|
gcp_project | クイックスタートを実行するプロジェクトID | GCPのproject-id |
gcs_bucket | 先ほど作成したバケットパス | gs://wordcount-result |
gce_zone | Cloud Composer を作成したリージョン | us-central1 |
設定するコマンドは以下のコマンドです。
gcloud composer environments run {Cloud Composer の環境名} --location {リージョン} variables -- --set {キー} {値}
本記事での具体的なコマンドは以下となります。
gcloud composer environments run my-environment \
--location us-central1 variables -- --set gcp_project my-project-id
gcloud composer environments run my-environment \
--location us-central1 variables -- --set gcs_bucket gs://wordcount-result
gcloud composer environments run my-environment \
--location us-central1 variables -- --set gce_zone us-central1
設定した値を確認するには、以下のコマンドを使用します。
gcloud composer environments run {Cloud Composer の環境名}
--location {リージョン} variables -- --get {キー}
ワークフローファイルを設定する
クイックスタートに記載のサンプルワークフロー quickstart.py をCloud Composer に設定します。
quickstart.py の内容については、上記引用したものと同じですが、公式ドキュメントのクイックスタートに記載されているソースの説明を抜粋しました。ここは公式ドキュメントのソースを見て確認するほうが分かり易いと思います。
- DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
- DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
- DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.
- DataprocClusterCreateOperator:Cloud Dataproc クラスターを作成します。
- DataProcHadoopOperator:Hadoop文字数カウントジョブを実行して結果を Cloud Storage バケットに書き込みます。
- DataprocClusterDeleteOperator:Compute Engine の料金がかからないように、Cloud Dataproc クラスターを削除します。
DAGを Cloud Storage にアップロードする
クイックスタートに記載されているサンプルワークフロー quickstart.py を Github から取得します。(取得方法は割愛します。公式ドキュメントからコピペでも取得可能です。)
ここでは、Cloud Shell を使用してアップロード方法を記載します。
- Cloud Shell を起動します。
- DAG用フォルダを作成します。(本記事では test-dag フォルダを作成しました。) mkdir test-dag/
quickstart.py を Cloud Shell にアップロードし、先ほど作成した test-dag フォルダに移動します。Cloud Shell でのファイルアップロードは以下のように簡単にできますが、ファイルはホームディレクトリにアップロードされるので、作成したフォルダに移動します。
以下のコマンドを使用し、DAGを Cloud Composer にアップロードします。
gcloud composer environments storage dags import
--environment {Cloud Composer の環境名} --location {リージョン}
--source {DAGファイルパス}
本記事では以下のコマンドを実行しました。
gcloud composer environments storage dags import
--environment my-environment --location us-central1
--source test-dag/quickstart.py
Airflow UIを開く
このクイックスタートは、Quickstart.py を Cloud Composer に設定すると、実行するようになっていますので、Cloud Composer の環境の画面にある「Airflow ウェブサーバー」の下の「Airflow」をクリックし、実行状態を確認します。
Airflow UIが開きます。以下のような感じで表示されます。
正常に終了した場合は、赤枠のようにSUCCESS(左隅の〇)が点灯します。
また、DAGをクリックすると、Tree View を表示します。
正常に完了していると、赤枠のステータスがSUCCESSとなります。マウスを緑の印に合わせるとステータスがわかります。
Graph View をクリックすると、どこかで見たことがあるジョブネットのようなイメージで表示されます。
実行結果
クイックスタートの実行結果は、作成した wordcount-result バケットに保存されています。以下がバケット内のイメージです。
作成されたファイルの一部をgsutil cat
コマンドで見てみるとこんな感じで出力されていました。
終わりに
本記事では、Cloud Composer のクイックスタートをなぞってみただけですが、バッチ処理で結構使えるイメージが湧きませんでしたか? Python を使ってバッチ処理が書けて、J(ピー)1のジョブネットのように実行監視ができるマネージドサービスは活用できる場面がたくさんあると思います。
私は、例えば BigQuery のバッチ処理や、クイックスタートのように Cloud Dataproc (Spark / Hadoop ) のバッチ処理など、日々ビッグデータを使用したデータ分析を行う企業の夜間バッチ処理に使いたいと思いました。
機会があれば次は Cloud Composer で BigQuery の処理を実行する記事を書こうと思います。
(もう既に偉大なる先人の記事があると思いますが・・・)
本記事を読んでいただいた方に Cloud Composer を知っていただき、ご活用いただけるキッカケになれば嬉しいです。お読みいただきありがとうございました。
参考サイト
Author And Source
この問題について(Google Cloud Platform Cloud Composer の公式クイックスタートをやってみた), 我々は、より多くの情報をここで見つけました https://qiita.com/AkiQ/items/237cd9e119b3c5ed83f8著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .