【GCP】Google Pub/Sub の送受信確認(gcloud コマンド & python3)


GCP の Cloud Pub/Sub を使い方を、gcloud コマンドとpython で確認できたのでメモします。自分の理解を図にしたものです。

以前に同じpubsub モデルの枠組みとしてMQTTを利用したことがありますが、そのときはtopic以外に意識はしなかったのですが、ここではsubscription というモジュールを通してより細かい?制御ができるようです。また、subscriber が取得していない message は補間できるなど、思っていた以上の機能があるようです。

概要

以下に、今回理解した内容のテストの実行手順を示します。

  1. GCP に topic と subscription を作成する。
  2. メッセージを topic に publish する。
  3. subscription から pull する。
  4. (for python) publish/subscriberするための権限を証明するkey file(json)を用意します。
  5. publisher, subscriber のプログラムを各ターミナルで実行

環境は Ubuntu (WSL)、power shell でそれぞれ確認しました。ソースは以下にあります。

LINK: github/xtkd77/test-docker-python/samples/gcp-pubsub-python-20200112

少しだけCloud pubsub のメモ

Subscription には pushとpullの2つのタイプが用意されています(説明)。Pull メッセージを任意のタイミングに発行して取りに行くことができ、Polling でも実装ができるようです

。実際にpython で実装するときは、message が届いたときに発行されるcallback 関数があり、そこに処理を実装するようにAPIが設計されているので、polling をベタに実装する必要はありません。

1. Topic と Subscription の用意

Topic を作り、そのtopic を購読するための subscription を作成します。

$ export TOPIC_ID="topic0000"
$ export SUBSCRIPTION_ID="subsc0000"
$ gcloud pubsub topics create ${TOPIC_ID}
$ gcloud pubsub subscriptions create ${SUBSCRIPTION_ID} --topic=${TOPIC_ID}

作られたものを確認します。pushConfig が無いsubscriptionがpull 型というのかな?

$ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 10
expirationPolicy:
  ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/my-project-id/subscriptions/subsc0000
pushConfig: {}
topic: projects/my-project-id/topics/topic0000

$ gcloud pubsub topics list
---
name: projects/my-project-id/topics/topic0000

2. publish する

puslish コマンドを実行すると、Message IDが返ってきます。

$ gcloud pubsub topics publish ${TOPIC_ID} --message "Hello World!"
messageIds:
- '926358110170555'

また、console.cloud.google.com の Pub/Sub のtopic のページからこの topic のページを開き、そこからメッセージを発行することもできます。

json をpublish (追記)

ファイル(json とかを保存したもの)の内容をそのままpublish したいとき、以下のコマンドを使っています。

$ gcloud pubsub toipcs publish ${topic_id} --message "`cat myfile.json`"

3. subscribe する

$ gcloud pubsub subscriptions pull --auto-ack ${SUBSCRIPTION_ID}
┌──────────────┬─────────────────┬────────────┐
│     DATA     │    MESSAGE_ID   │ ATTRIBUTES │
├──────────────┼─────────────────┼────────────┤
│ Hello World! │ 926358110170555 │            │
└──────────────┴─────────────────┴────────────┘

ここまでは、google SDK で認証が通っているので、できました。

また、console.cloud.google.com の Pub/Sub のsubscriptionのページからこの subscription のページを開き、そこからメッセージをtopic に投稿されたメッセージをpull することもできます。「メッセージを表示」をクリックして出てきた画面で「pull」という青いボタンを押すと、たまっていたメッセージがpull されます。

4. pub and/or sub の権限のあるサービスアカウントのキーを用意

python で実行するとき、publish する(topic を発行する)、subscribe する (subscriptionにアクセスする)ために、認証が必要になります。python のAPIを利用する場合、それはpublisher や subscriber のインスタンスを作成するときに必要になります。

ここでは素直に console.cloud.google.com で行いました。左側のメニューで「IAMと管理」のサブ項目にある「サービスアカウント」を選択します。ここで「+サービスアカウントを作成」を選択します。

あとは流れに沿って進め、PubSub について必要な権限を付与します。
キーを発行できるので、json ファイルをダウンロードします。
ファイル名は任意で構いません。なお、default では環境変数に設定してあるキーファイルを読むようなので、プログラムで明示的にkeyファイルを指定しない場合は、環境変数を設定します。(無くても良いです。)

$ export GOOGLE_APPLICATION_CREDENTIALS="./myproject-mykey.json"

5. python application でpublish する

開発上は、python の仮想環境を用意してやるのがマナーのようです。

python モジュール google-cloud-pubsub がインストールされていれば、以下のプログラムを実行できると思います。

pub.py
import sys, os, time, datetime
from google.cloud import pubsub_v1


project_id, topic_name = sys.argv[1], sys.argv[2]
cred_file = sys.argv[3] 

publisher = pubsub_v1.publisher.Client.from_service_account_file(cred_file)
topic_path = publisher.topic_path(project_id, topic_name)

cnt = 0
while True:
    data = u"Message from test publisher {}".format(cnt) + datetime.datetime.now().isoformat(" ")
    data = data.encode("utf-8")
    print("Publish: " + data.decode("utf-8", "ignore") )
    future = publisher.publish(topic_path, data=data)
    print("return ", future.result())
    time.sleep(0.25)
    cnt = cnt + 1

もし環境変数にkeyファイルが指定してあれば、publisher のinstance は以下で作られます。

publisher = pubsub_v1.PublisherClient()

使い方は、

$ python3 pub.py my-project-id my-topic my-key.json

6. python で subscribe したメッセージを表示する

以下のプログラムを使います。

sub.py
import os, sys, time, datetime
from google.cloud import pubsub_v1

def callback(message):
    now = datetime.datetime.now()
    print( "msg = \"" + message.data.decode("utf-8") + "\"" +  "  [" + now.isoformat(" ") + "]")
    message.ack()

project_id, sub_name, cred_file = sys.argv[1],  sys.argv[2], sys.argv[3]

subscriber = pubsub_v1.subscriber.Client.from_service_account_file(cred_file)

subpath = subscriber.subscription_path(project_id, sub_name)
flow_control = pubsub_v1.types.FlowControl(max_messages=2)

subscriber.subscribe(subpath, callback=callback, flow_control = flow_control)
input()

使い方は、

$ python3 sub.py my-project-id my-subscription my-key.json

実際の動かすと、publish したときのシステムの時刻とsub.py でmessage を受信したときの時刻のずれを読めます。

msg = "Message from test publisher 262020-01-12 13:19:32.691757"  [2020-01-12 13:19:32.793407]
msg = "Message from test publisher 272020-01-12 13:19:33.009596"  [2020-01-12 13:19:33.090725]
msg = "Message from test publisher 282020-01-12 13:19:33.285470"  [2020-01-12 13:19:33.335753]
msg = "Message from test publisher 292020-01-12 13:19:33.566437"  [2020-01-12 13:19:33.619432]
msg = "Message from test publisher 302020-01-12 13:19:33.845906"  [2020-01-12 13:19:33.896490

References

Google 提供の情報

参考にさせていただいたブログ

雑感

今回、subscription のpull/push のタイプの違いがあるところや、認証が必要なところでハマりました。涙
使い方は分かったのですが、これをDocker, cloud build, Kubernetes で動かすとすると先が長いな。。。真面目に本稿を書いて疲れた。とは言え、実際に多対多でたくさんの種類のメッセージをやりとりするテストをしていないので、使う前にこれらもテストはしてみたいと思います。