[VertexAI Pipeline]#2複数コンポーネントでデータを連携_文字列、数値編

16746 ワード

こんにちは! chameleonです!

前回はVertexAIでhello-worldを出力するPipelineを作成しました。
ただ、これだけでは実用的なパイプラインの作成はできません。
実用的なパイプライン作成にはコンポーネント間のデータの受渡しが必須になってきます。

KubeflowPipelineではコンポーネント間のデータの受渡しが少し独特です。
コンポーネント間で受渡すデータは下記のようなパターンがあります。

No 連携データ
1 文字列や数値
2 ファイル
3 関数等のオブジェクト

今回はNo1の文字列や数値をコンポーネント間で受渡しする処理を実行してみようと思います。

想定する読者

・VertexAI Pipelineを使ってみたい
・KubeflowPipelineのコンポーネント間のデータの受渡し方法がわからない

ゴール

2つのコンポーネントを作成し、コンポーネントの出力を次のコンポーネントが入力として受取り処理を行うパイプラインを作成します。
generate_name:苗字と名前の2つの引数を受け取って「苗字 名前」と文字列を連結するコンポーネント
show_name:引数として受け取った文字列を出力

generate_nameが作成した「苗字 名前」文字列をshow_nameが受取って出力を行うパイプラインを作成します。

構築の大まかな流れ

0.必要なライブラリの宣言
1.処理を行うコンポーネント(関数)の作成(generate_nameとshow_nameの作成)
2.パイプラインの作成
3.コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
4.コンパイルしたパイプラインの実行

構築スタート!!!

0.必要なライブラリの宣言

まずはパイライン作成に必要なライブラリのインポート宣言を行います。
この辺りは前回と一緒ですね。

from kfp.v2.dsl import component #コンポーネント作成用
from kfp import dsl #パイプライン作成用
from kfp.v2 import compiler #コンパイラ用
import google.cloud.aiplatform as aip

1.処理を行うコンポーネント(関数)の作成

まずは、処理を行うコンポーネント(関数)を作成していきます。
@componentというデコレータを利用してメソッド定義を行います。

1.1.generate_nameの作成

1つめのコンポーネントとしてgenerate_nameを作成します。
通常のpythonメソッドと同様に処理結果をreturnで返却します。
引数として苗字と名前という2つの文字列引数を受取り、それを連結して1つの文字列として返却します。

#苗字と名前を受け取って「苗字 名前」を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name(first_name:str, last_name:str) -> str:
    return f"{first_name} {last_name}"

1.2.show_nameの作成

2つめのコンポーネントとしてshow_nameを作成します。
こちらは、文字列引数を受け取って、出力として"I am 引数文字列"として出力を行うコンポーネントです。

#文字列を受け取って表示するコンポーネント
@component(base_image="python:3.7")
def show_name(var:str):
    print(f"I am {var}.")
    pass

2.パイプラインの作成

次はメインとなるパイプライン(ワークフロー)を定義します。
パイプラインの定義は@dsl.pipelineというデコレータを利用します。

まずは、手順1で作ったgenerate_nameコンポーネントに対し、引数として"chameleon"と"太郎"を渡します。
これをout1オブジェクト代入し、この処理結果out1.outputをshow_nameコンポーネントに渡します。
ここで、generate_nameコンポーネントの出力結果はoutputというアトリビュートに格納されています。
コンポーネント間で単一のデータ(文字列、数値)を受渡す際にはoutputアトリビュートに格納されているデータを連携します。

#手順2 パイプラインの作成
@dsl.pipeline(name="tuto-pipeline2")
def tuto_pipeline():
    out1 = generate_name("chameleon","太郎")
    show_name(out1.output)
    pass

3.コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行

手順3以降は前回と同様の手順となります。
上記で作ったパイプラインをコンパイルします。
手順2で作成したpipeline関数とコンパイル結果として出力するjsonファイルの出力先を指定します。

# コンパイル
# 手順3:コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
compiler.Compiler().compile(
    pipeline_func = tuto_pipeline,
    package_path  = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v2.json",
)

4.コンパイルしたパイプラインの実行

最後にコンパイルしたjsonファイルを実行してVertexAIのpipelineサービスでワークフローを実行します。
aip.initでGCPプロジェクトのIDと処理を行いロケーションを指定して初期化を行います。

最後にパイプラインの表示名と、手順3でコンパイルしたjsonファイルのパスを指定して実行します。

# コンパイル
# 手順3:コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
compiler.Compiler().compile(
    pipeline_func = tuto_pipeline,
    package_path  = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v2.json",
)


# パイプラインの実行
# 手順4:コンパイルしたパイプラインの実行
aip.init(
    project="awesome-height-240603",
    location="us-central1"
)

ソース全体

ソースの全体は下記のような感じです。

from kfp.v2.dsl import component #コンポーネント作成用
from kfp import dsl #パイプライン作成用
from kfp.v2 import compiler #コンパイラ用
import google.cloud.aiplatform as aip

#やりたいこと
#複数のコンポーネントを繋げて、1つのコンポーネントの出力を2つ目のコンポーネントのインプット
#として処理したい

#手順1 コンポーネント(関数)の作成

#苗字と名前を受け取って「苗字 名前」を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name(first_name:str, last_name:str) -> str:
    return f"{first_name} {last_name}"

#文字列を受け取って表示するコンポーネント
@component(base_image="python:3.7")
def show_name(var:str):
    print(f"I am {var}.")
    pass

#手順2 パイプラインの作成
@dsl.pipeline(name="tuto-pipeline2")
def tuto_pipeline():
    out1 = generate_name("chameleon","太郎")
    show_name(out1.output)
    pass

# コンパイル
# 手順3:コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
compiler.Compiler().compile(
    pipeline_func = tuto_pipeline,
    package_path  = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v2.json",
)


# パイプラインの実行
# 手順4:コンパイルしたパイプラインの実行
aip.init(
    project="awesome-height-240603",
    location="us-central1"
)

aip.PipelineJob(
    display_name = "Pipeline基礎編",
    template_path = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v2.json",
    pipeline_root = "gs://vertex_chameleon_data"
).run()

パイプラインの実行

パイプラインの実行はGCPのクラウドシェルから実行します。
GCPコンソールの右上にあるマークをクリックするとクラウドシェルが立ち上がります。

クラウドシェルで適当なディレクトリを作成し、そこに上記で作成したpythonファイルを配置します。

ちなみに、ローカルのファイルをクラウドシェル上のパスにアップロードするには、クラウドシェルターミナルの右上にあるメニューの「アップロード」から実行することができます。

そして、pythonファイルの実行を行います。

 python pipeline_v2.py

結果確認

VertexAIのパイプライン画面に移動して結果を確認してみましょう。
上記で実行したパイプラインが表示されているはずです。

対象のパイプラインをクリックすると処理結果の詳細が確認できます。

上記のように今回作成されたコンポーネント(generate_nameとshow_name)が表示されます。
generate_nameコンポーネント処理の後続としてshow_nameコンポーネントが処理されることが視覚的に表現されてます。
show_nameをクリックするとこのコンポーネントのログが下に表示されます。
フィルタで「chameleon」と絞り込みを行いことで、今回出力したログを見つけることができます。