[GCP]Cloud ShellでDataFlowをデプロイするまでの手順(Python利用)


はじめに

DataFlowのデプロイ方法がパッと見つからなかったため、また単純に動かせるプログラムが見当たらなかったため、備忘録としてまとめました。

手順

1. apache_beamインストール

Cloud Shellで以下のコマンドを実行。

sudo pip3 install apache_beam[gcp]

以下のインストール方法ではbeam.io.ReadFromTextでエラーがでるのでダメです。

sudo pip install apache_beam

仮想環境を用いたapache_beamインストール方法は以下の通りです。

# フォルダ作成
mkdir python2
cd python2

# 仮想環境作成
python -m virtualenv env

# アクティベート
source env/bin/activate

# apache-beamインストール
pip install apache-beam[gcp]

2. プログラム作成

今回は以下のような単純なものを作成しました。
指定したバケットの直下にあるread.txtというファイルを読み込んで、write.txtというファイルに出力するだけです。

実際に試したい方はPROJECTID, JOB_NAME, BUCKET_NAME に適当な内容を入力してください。

gcs_readwrite.py
# coding:utf-8
import apache_beam as beam

# ジョブ名、プロジェクトID、バケット名を指定
PROJECTID = '<PROJECTID>'
JOB_NAME = '<JOB_NAME>'  # DataFlowのジョブ名を入力
BUCKET_NAME = '<BUCKET_NAME>'

# ジョブ名、プロジェクトID、一時ファイルの置き場を設定
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = JOB_NAME
gcloud_options.project = PROJECTID
gcloud_options.staging_location = 'gs://{}/staging'.format(BUCKET_NAME)
gcloud_options.temp_location = 'gs://{}/tmp'.format(BUCKET_NAME)

# Workerの最大数や、マシンタイプ等を指定
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
# worker_options.disk_size_gb = 100
# worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'

# 実行環境の切替
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

# パイプライン
p = beam.Pipeline(options=options)

(p | 'read' >> beam.io.ReadFromText('gs://{}/read.txt'.format(BUCKET_NAME))
    | 'write' >> beam.io.WriteToText('gs://{}/write.txt'.format(BUCKET_NAME))
 )
p.run().wait_until_finish()

3. GCSの準備

  1. 上記プログラム内のBUCKET_NAMEで指定したバケット名を作成してください
  2. 作成したバケット直下にstaging, tmp というフォルダを作成してください
  3. ローカルでread.txtというファイルを作成してください。中身はなんでも良いです
  4. 作成したバケット直下にread.txtをアップロードしてください

4. ローカルで実行

まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。

options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

次に以下のコマンドを実行します。

python gcs_readwrite.py

これでバケット内にwrite.txt-00000-of-00001というファイルが作成されます。

5. デプロイ

まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。

# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

次に以下のコマンドを実行します。

python gcs_readwrite.py

これでバケット内にwrite.txt-00000-of-00001というファイルが作成されます。
DataFlowのGUIで作成したジョブを選択すると、readwriteが「完了しました」になっていることがわかります。

おまけ(カスタムテンプレートの作成方法)

以下のような1行を追加して実行するだけでカスタムテンプレートが作成されます。
保存先やテンプレート名は自由に選択できます。

gcloud_options.template_location = 'gs://{}/template/template_name'.format(BUCKET_NAME)

カスタムテンプレートの利用は、
カスタムテンプレートからジョブを作成 -> テンプレートを選択 -> カスタムテンプレート -> テンプレートの GCS パスを指定
とするだけです。

参考

Python を使用したクイックスタート
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python?hl=ja

パイプラインの実行パラメータを指定する
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params

Cloud Dataflow 超入門
https://qiita.com/hayatoy/items/987658490a69c7d24635