[GCP]Cloud ShellでDataFlowをデプロイするまでの手順(Python利用)
はじめに
DataFlowのデプロイ方法がパッと見つからなかったため、また単純に動かせるプログラムが見当たらなかったため、備忘録としてまとめました。
手順
1. apache_beamインストール
Cloud Shellで以下のコマンドを実行。
sudo pip3 install apache_beam[gcp]
以下のインストール方法ではbeam.io.ReadFromText
でエラーがでるのでダメです。
sudo pip install apache_beam
仮想環境を用いたapache_beamインストール方法は以下の通りです。
# フォルダ作成
mkdir python2
cd python2
# 仮想環境作成
python -m virtualenv env
# アクティベート
source env/bin/activate
# apache-beamインストール
pip install apache-beam[gcp]
2. プログラム作成
今回は以下のような単純なものを作成しました。
指定したバケットの直下にあるread.txt
というファイルを読み込んで、write.txt
というファイルに出力するだけです。
実際に試したい方はPROJECTID
, JOB_NAME
, BUCKET_NAME
に適当な内容を入力してください。
# coding:utf-8
import apache_beam as beam
# ジョブ名、プロジェクトID、バケット名を指定
PROJECTID = '<PROJECTID>'
JOB_NAME = '<JOB_NAME>' # DataFlowのジョブ名を入力
BUCKET_NAME = '<BUCKET_NAME>'
# ジョブ名、プロジェクトID、一時ファイルの置き場を設定
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = JOB_NAME
gcloud_options.project = PROJECTID
gcloud_options.staging_location = 'gs://{}/staging'.format(BUCKET_NAME)
gcloud_options.temp_location = 'gs://{}/tmp'.format(BUCKET_NAME)
# Workerの最大数や、マシンタイプ等を指定
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
# worker_options.disk_size_gb = 100
# worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'
# 実行環境の切替
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner' # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner' # Dataflow上で実行
# パイプライン
p = beam.Pipeline(options=options)
(p | 'read' >> beam.io.ReadFromText('gs://{}/read.txt'.format(BUCKET_NAME))
| 'write' >> beam.io.WriteToText('gs://{}/write.txt'.format(BUCKET_NAME))
)
p.run().wait_until_finish()
3. GCSの準備
- 上記プログラム内の
BUCKET_NAME
で指定したバケット名を作成してください - 作成したバケット直下に
staging
,tmp
というフォルダを作成してください - ローカルで
read.txt
というファイルを作成してください。中身はなんでも良いです - 作成したバケット直下に
read.txt
をアップロードしてください
4. ローカルで実行
まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner' # ローカルマシンで実行
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner' # Dataflow上で実行
次に以下のコマンドを実行します。
python gcs_readwrite.py
これでバケット内にwrite.txt-00000-of-00001
というファイルが作成されます。
5. デプロイ
まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner' # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner' # Dataflow上で実行
次に以下のコマンドを実行します。
python gcs_readwrite.py
これでバケット内にwrite.txt-00000-of-00001
というファイルが作成されます。
DataFlowのGUIで作成したジョブを選択すると、read
とwrite
が「完了しました」になっていることがわかります。
おまけ(カスタムテンプレートの作成方法)
以下のような1行を追加して実行するだけでカスタムテンプレートが作成されます。
保存先やテンプレート名は自由に選択できます。
gcloud_options.template_location = 'gs://{}/template/template_name'.format(BUCKET_NAME)
カスタムテンプレートの利用は、
カスタムテンプレートからジョブを作成 -> テンプレートを選択 -> カスタムテンプレート -> テンプレートの GCS パスを指定
とするだけです。
参考
Python を使用したクイックスタート
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python?hl=ja
パイプラインの実行パラメータを指定する
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
Cloud Dataflow 超入門
https://qiita.com/hayatoy/items/987658490a69c7d24635
Author And Source
この問題について([GCP]Cloud ShellでDataFlowをデプロイするまでの手順(Python利用)), 我々は、より多くの情報をここで見つけました https://qiita.com/hanzawak/items/6e29e91b4ff6aa9d6108著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .