Cloud Pub/Sub の Push サブスクリプションの動きを試してみる


はじめに

お題の通り Cloud Pub/Sub の Push サブスクリプションの様々なケースの挙動が気になったので、主に正常系と異常系のテーマに分けて整理してみました。

登場人物

  • Publisher

    • メッセージを生成し Topic にメッセージを送信
  • Topic

    • メッセージキュー
  • Subscription

    • Topic に公開されたメッセージを受信し Subscriber にメッセージを配信
  • Subscriber

    • Subscription からメッセージを受け取って ACK もしくは NACK を返却

試してみる

Subscriber は Cloud Function(Go) を利用。

正常系

1. Subscriber から肯定応答 (ACK) が返してくる

package main

import (
    "context"

    "cloud.google.com/go/pubsub"
)

func ReceiveMessage(ctx context.Context, m *pubsub.Message) error {
	return nil
}
# 最初の試行
2022-03-27 18:33:23.857 JSTxxx Function execution started
2022-03-27 18:33:23.863 JSTxxx Function execution took 8 ms, finished with status: 'ok'

異常系

1. Subscriber から否定応答 (NACK) が返してくる

a. Subscriber の再試行が無効になっている

  • Subscription
    • Ack Deadline: 20秒
    • Retry Policy
      • Count: 5回
      • Backoff: 1秒 ~ 30秒
  • Subscriber
    • Timeout: 30秒
    • Failure Policy: 無効
package main

import (
	"context"

	"cloud.google.com/go/pubsub"
	"golang.org/x/xerrors"
)

func ReceiveMessage(ctx context.Context, m *pubsub.Message) error {
	return xerrors.New("boom")
}
# 最初の試行
2022-03-27 18:43:30.825 JSTxxx Function execution started
2022-03-27 18:43:30.829 JSTxxx Function error: boom
2022-03-27 18:43:30.831 JSTxxx Function execution took 7 ms, finished with status: 'error'

b. Subscriber の再試行が有効になっている

  • Subscription
    • Ack Deadline: 20秒
    • Retry Policy
      • Count: 5回
      • Backoff: 1秒 ~ 30秒
  • Subscriber
    • Timeout: 30秒
    • Failure Policy: 有効
# 最初の試行
2022-03-27 18:56:19.207 JSTxxx Function execution started
2022-03-27 18:56:19.375 JSTxxx Function error: boom
2022-03-27 18:56:19.377 JSTxxx Function execution took 172 ms, finished with status: 'error'
# 再試行
2022-03-27 18:56:22.502 JSTxxx Function execution started
2022-03-27 18:56:22.505 JSTxxx Function error: boom
2022-03-27 18:56:22.506 JSTxxx Function execution took 5 ms, finished with status: 'error'
2022-03-27 18:56:25.211 JSTxxx Function execution started
2022-03-27 18:56:25.249 JSTxxx Function error: boom
2022-03-27 18:56:25.250 JSTxxx Function execution took 41 ms, finished with status: 'error'
2022-03-27 18:56:27.874 JSTxxx Function execution started
2022-03-27 18:56:27.950 JSTxxx Function error: boom
2022-03-27 18:56:27.951 JSTxxx Function execution took 78 ms, finished with status: 'error'
2022-03-27 18:56:30.878 JSTxxx Function execution started
2022-03-27 18:56:30.879 JSTxxx Function error: boom
2022-03-27 18:56:30.880 JSTxxx Function execution took 2 ms, finished with status: 'error'

全ての再試行が失敗したら Subscription リソースに設定した Dead-letter Topic に該当するメッセージが渡されます。

2. Subscriber がタイムアウトされる

a. Subscriber の再試行が無効になっている

  • Subscription
    • Ack Deadline: 20秒
    • Retry Policy
      • Count: 5回
      • Backoff: 1秒 ~ 30秒
  • Subscriber
    • Timeout: 10秒
    • Failure Policy: 無効
package main

import (
	"context"
	"time"

	"cloud.google.com/go/pubsub"
)

func ReceiveMessage(ctx context.Context, m *pubsub.Message) error {
	time.Sleep(time.Second * 15)

	return nil
}
# 最初の試行
2022-03-27 19:05:41.164 JSTxxx Function execution started
2022-03-27 19:05:51.165 JSTxxx Function execution took 10004 ms, finished with status: 'timeout'

b. Subscriber の再試行が有効になっている

  • Subscription
    • Ack Deadline: 20秒
    • Retry Policy
      • Count: 5回
      • Backoff: 1秒 ~ 30秒
  • Subscriber
    • Timeout: 10秒
    • Failure Policy: 有効
# 最初の試行
2022-03-27 19:09:51.339 JSTxxx Function execution started
2022-03-27 19:10:01.342 JSTxxx Function execution took 10005 ms, finished with status: 'timeout'
# 再試行
2022-03-27 19:10:03.931 JSTxxx Function execution started
2022-03-27 19:10:13.933 JSTxxx Function execution took 10004 ms, finished with status: 'timeout'
2022-03-27 19:10:16.549 JSTxxx Function execution started
2022-03-27 19:10:26.552 JSTxxx Function execution took 10004 ms, finished with status: 'timeout'
2022-03-27 19:10:29.216 JSTxxx Function execution started
2022-03-27 19:10:39.217 JSTxxx Function execution took 10003 ms, finished with status: 'timeout'
2022-03-27 19:10:42.785 JSTxxx Function execution started
2022-03-27 19:10:52.787 JSTxxx Function execution took 10003 ms, finished with status: 'timeout'

3. Subscriber の実行時間より Subscription の確認応答期限 (Ack Deadline) が短い

  • Subscription
    • Ack Deadline: 10秒
    • Retry Policy
      • Count: 5回
      • Backoff: 1秒 ~ 30秒
  • Subscriber
    • Timeout: 30秒
    • Failure Policy: 有効
package main

import (
	"context"
	"time"

	"cloud.google.com/go/pubsub"
)

func ReceiveMessage(ctx context.Context, m *pubsub.Message) error {
	time.Sleep(time.Second * 15)

	return nil
}
# 最初の試行
2022-03-27 19:19:16.624 JSTxxx Function execution started
# 再試行
2022-03-27 19:19:29.009 JSTxxx Function execution started
2022-03-27 19:19:31.630 JSTxxx Function execution took 15007 ms, finished with status: 'ok'
2022-03-27 19:19:41.375 JSTxxx Function execution started
2022-03-27 19:19:44.087 JSTxxx Function execution took 15079 ms, finished with status: 'ok'
2022-03-27 19:19:53.790 JSTxxx Function execution started
2022-03-27 19:19:56.377 JSTxxx Function execution took 15003 ms, finished with status: 'ok'
2022-03-27 19:20:07.109 JSTxxx Function execution started
2022-03-27 19:20:08.793 JSTxxx Function execution took 15004 ms, finished with status: 'ok'
2022-03-27 19:20:22.112 JSTxxx Function execution took 15004 ms, finished with status: 'ok'

Subscriber が肯定応答 (ACK) を返しても確認応答期限 (Ack Deadline) がそれより短いので再試行が実施されてしまい、全ての再試行が失敗したら Subscription リソースに設定した Dead-letter Topic に該当するメッセージが渡されます。

つまり、成功したメッセージを失敗したメッセージとして扱われることになるので、確認応答期限 (Ack Deadline) は Subscriber の実行時間よりもっと余裕を持って設定した方が正しいと思います。

その他

Subscriber の再試行が有効になっている場合は冪等性が大事

当たり前のことなんですが、Subscriber 内部で外部データベースへの書き込みやデータのコピーなどを行う実装が含まれている場合は再実行されても冪等性が担保されるように工夫するのが必要になると思います。

https://cloud.google.com/functions/docs/bestpractices/retries#make_retryable_event-driven_functions_idempotent

同一なメッセージなのかを判定するにはメッセージIDが使えます。

メッセージID とメッセージの発行時間を取得する

m *pubsub.Message から直接参照することはできませんが、Cloud Function のメタデータからは参照できます。

https://cloud.google.com/functions/docs/calling/pubsub#event_structure

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/functions/metadata"
	"cloud.google.com/go/pubsub"
)

func ReceiveMessage(ctx context.Context, m *pubsub.Message) error {
	meta, err := metadata.FromContext(ctx)
	if err != nil {
		return fmt.Errorf("metadata.FromContext: %v", err)
	}
	log.Printf("cloud functions metadata: %v", meta)

	return nil
}
2022-03-27 20:32:59.831 JSTxxx Function execution started
2022-03-27 20:32:59.839 JSTxxx 2022/03/27 11:32:59 cloud functions metadata: &{3954231714441720 2022-03-27 11:32:58.535 +0000 UTC google.pubsub.topic.publish 0xc0003b45c0}
2022-03-27 20:32:59.841 JSTxxx Function execution took 12 ms, finished with status: 'ok'

メッセージID が 3954231714441720 で、メッセージの発行時間が 2022-03-27 11:32:58.535 +0000 UTC です。

メッセージID は、同一のメッセージが重複で配信されるケースで活用できるデータとして、外部のデータベース(例、Memorystore もしくは Firestore)にメッセージID を格納し既に成功されているかどうかが判定できる流れを実現できます。

メッセージの発行時間は、主にデータの性質と絡んでいると思いますが、一定時間が経ったメッセージに対しては有効なデータではないっていうようなドメイン独自のルールとかがあるケースで活用できると思います。

At-least-once 配信から Exactly-once 配信に切り替える

まだ Preview 機能なんですが、1回限りの配信を保証してくれるオプションもあります。そこそこの規模じゃないと At-least-once 配信でもメッセージが重複して配信される可能性は低いと思いますが、センシティブな機能と紐づいているとかプロジェクトのドメイン(例、金融)が完璧を求めるケースなどであれば有用な機能だと思われます。

https://cloud.google.com/pubsub/docs/exactly-once-delivery

  • gcloud
$ gcloud beta pubsub subscriptions update SUBSCRIPTION \
    --enable-exactly-once-delivery
  • Terraform

まだサポートされていないですが、PR が上がっているので近いうちに利用できそうです。

https://github.com/hashicorp/terraform-provider-google/issues/11286