GCP:Pub/SubからCloud Functions、Cloud Functions からPub/Sub、を繰り返す


以下今回のアーキテクチャイメージです。

用意するモノ

・Pub/SubでTopic2つ
 ┗「topic_1」と「topic_2」という名前とする

・Cloud Scheduler
 ┗トピックに「topic_1」を設定し、ペイロードは「hello」とする

・Cloud FunctionsでFunctionを2つ
 ┗「function_1」と「function_2」という名前とする
  「function_1」のトリガーはPub/Subの「topic_1」を設定する
  「function_2」のトリガーはPub/Subの「topic_2」を設定する

function_1の中身

以下の「event_message」ではtopic_1を経由したCloud Schedulerで設定したペイロード「hello」とかの文字列が格納されている。

N個のCloud Schedulerをペイロードだけ変えてほか同じ設定にすると、function_1 内で event_message を判定させて後続処理をごにょごにょできる

def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')

次に、function_2にリストを渡すと仮定する
Pub/Subではテキストのみが渡せるのでencodeさせる必要がある

from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2" # 次にパスするトピックを設定

topic_path = client.topic_path(PROJECT_ID, topic_id)

pub_text = ["りんご", "ゴリラ", "ラッパ"]

data = pub_text.encode() # ここでエンコード
client.publish(topic_path, data=data) # これでtopic_2にpub_textがpushされる

上記の最終行でtopic_2へ'["りんご", "ゴリラ", "ラッパ"]'が渡されてfunction_2が発火している。

function_2の中身

以下 event_message では'["りんご", "ゴリラ", "ラッパ"]'が入っているのでevalしてpythonのリストに戻す。
後続処理はよしなに

def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')
    fruit_lst = eval(event_message)

function_2を並列起動させてみる

Cloud Functionsは以下公式ドキュメントの通り、並列起動ができる
https://cloud.google.com/functions/quotas?hl=ja#scalability

そのためfunction_1を以下のようにループさせながらPub/Subへpublishさせると、function_2では3つのフルーツを取得することになる

function_1.py
from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2"

topic_path = client.topic_path(PROJECT_ID, topic_id)

fruit_lst = ["りんご", "ゴリラ", "ラッパ"]

for fruit in fruit_lst:
    data = fruit.encode()
    client.publish(topic_path, data=data)

参考文献

https://cloud.google.com/solutions/streaming-data-from-cloud-storage-into-bigquery-using-cloud-functions?hl=ja
https://cloud.google.com/functions/quotas?hl=ja#scalability

余談

list や dict も encode() して Pub/Sub へ渡して受け取った側で eval すれば元通りなので、複数の軽い処理を定期的に実行する、という場合に便利でした。