ParallelizationFactorによる並列化でKinesisのレコードはLambdaにどう渡されるのか


Lambdaはこのアップデートで、ParallelizationFactor がサポートされ、ストリームソースの1つのシャードを複数のLambda呼び出しで並列処理できるようになりました。

並列化した場合に関数にはどのようにレコードが渡ってくるのか、気になる点を確認しました。

ParallelizationFactor の意義

Kinesis Data Streamsではシャード毎にレコードに振ったシーケンス番号でレコードの入力と出力の順序を保証しているため、コンシューマで、あるレコードの処理が完了せず停滞した場合、次以降のレコードはその分遅延します。

この遅延量は、Kinesisでは GetRecords.IteratorAgeMilliseconds メトリクスで1、Lambda側では IteratorAge メトリクス2で監視でき、遅延測定のポイントが違いますが、Kinesisを使う時は両方のメトリクスの増加に気をつける必要があります。

遅延はストリームのデータの供給速度より消費速度が遅ければ、徐々に蓄積していくことになります。
これまで、Kinesis Client LibraryでもLambdaでも、基本的に1つのシャードを1つのコンシューマが担当してシリアルに処理するため、シャード数を増やして遅延対策をする他ありませんでした。

しかし、Lambdaに ParallelizationFactor オプションが追加され、1つのシャードに対して、Lambda呼び出しを1〜10並列実行でき、消費側のスループットだけをスケールさせられるようになりました。
これは IteratorAge が高くなりがちなワークロードに対して非常に有効な対策となります。

実験環境

下記のように、コンシューマ側で遅延を生じる環境を作成して、DynamoDBに記録された処理結果と、CloudWatchのメトリクスを見ていきます。

  • Kinesis Data Streams
    • シャード数: 1
  • Producer
    • producer/put_records.rb をPCで実行する
    • put_records.rb
    • 15 をパーティションキーとして、各パーティションキーごとに1始まりの整数をデータとするレコードを1秒毎に30回送信する
  • Consumer
    • consumer/lambda_function.rb をトリガをKinesisとして、Lambdaで実行する
    • lambda_function.rb
    • 1呼び出しごとに、DynamoDBに受信したレコードと処理時刻を記録する
    • 1レコードの処理ごとに3秒スリープする

環境構築はこちらのterraformで行いました。

並列なし(ParallelizationFactor = 1)の場合

まず、並列なしで実行した結果です。
以下は関数から処理開始時刻、終了時刻、イベントで渡されたレコードの {パーティションキー}-{データ番号} をDynamoDBに記録したものと、CloudWatchのメトリクスです。
呼び出しは計4回シリアルに行われ、最後の呼び出し時点ではLambdaのIteratorAgeは300秒の遅延となり、トータル450秒かかりました。

$ aws dynamodb scan --table-name $TABLENAME | jq '.Items | map({start: .start.S, end: .end.S, records: [.records.L[].S] | join(",")}) | sort_by(.start)'
[
  {
    "start": "2020-01-14 06:01:10 +0000",
    "end": "2020-01-14 06:01:25 +0000",
    "records": "1-1,2-1,3-1,4-1,5-1"
  },
  {
    "start": "2020-01-14 06:01:25 +0000",
    "end": "2020-01-14 06:03:55 +0000",
    "records": "1-2,2-2,3-2,4-2,5-2,1-3,2-3,3-3,4-3,5-3,1-4,2-4,3-4,4-4,5-4,1-5,2-5,3-5,4-5,5-5,1-6,2-6,3-6,4-6,5-6,1-7,2-7,3-7,4-7,5-7,1-8,2-8,3-8,4-8,5-8,1-9,2-9,3-9,4-9,5-9,1-10,2-10,3-10,4-10,5-10,1-11,2-11,3-11,4-11,5-11"
  },
  {
    "start": "2020-01-14 06:03:55 +0000",
    "end": "2020-01-14 06:06:25 +0000",
    "records": "1-12,2-12,3-12,4-12,5-12,1-13,2-13,3-13,4-13,5-13,1-14,2-14,3-14,4-14,5-14,1-15,2-15,3-15,4-15,5-15,1-16,2-16,3-16,4-16,5-16,1-17,2-17,3-17,4-17,5-17,1-18,2-18,3-18,4-18,5-18,1-19,2-19,3-19,4-19,5-19,1-20,2-20,3-20,4-20,5-20,1-21,2-21,3-21,4-21,5-21"
  },
  {
    "start": "2020-01-14 06:06:25 +0000",
    "end": "2020-01-14 06:08:40 +0000",
    "records": "1-22,2-22,3-22,4-22,5-22,1-23,2-23,3-23,4-23,5-23,1-24,2-24,3-24,4-24,5-24,1-25,2-25,3-25,4-25,5-25,1-26,2-26,3-26,4-26,5-26,1-27,2-27,3-27,4-27,5-27,1-28,2-28,3-28,4-28,5-28,1-29,2-29,3-29,4-29,5-29,1-30,2-30,3-30,4-30,5-30"
  }
]

re:invent2019の A serverless journey: AWS Lambda under the hood (SVS405-R1)で説明されていますが、Lambdaのストリームソースの処理の内部では、Stream Tracker が、Leasing Service を通じて、起動しているシャード数に従って Poller をアレンジし、Poller がシャードをサブスクライブをしてGetRecordsを行っています。そしてLambdaの関数はPollerから呼び出されます。

ここで、Kinesisの GetRecords.IteratorAgeMilliSeconds に遅延がないのは、PollerのGetRecordsの実行には遅延がないためです。
GetRecords.IteratorAgeMilliSeconds は、タイムアウトなどLambdaの呼び出しがエラーになったことで、GetRecordsが遡って行われる場合に増加します。

ParallelizationFactor = 3の場合

次にParallelizationFactorを3にして実行します。変更はコンソールか、CLIで下記のように設定できます。

$ aws lambda update-event-source-mapping --uuid $TRIGGER_UUID --parallelization-factor 3

結果はこのようになりました。
Lambdaの呼び出しは計10回で、トータルは180秒で完了しました。IteratorAgeの最大も67秒に短縮できています。

各呼び出しで渡されたレコードを見ると、パーティションキー 1, 4 のレコード、3のレコード、2, 5のレコードのグループに分けられています。
そして呼び出しのタイムスタンプを見ると、各グループの処理は並列に呼び出されているけれども、グループごとにシリアルに実行されています。
つまり、並列化してもパーティションキー毎のレコード順序は保証されます。

こちらでも説明されていますが、PollerはLambdaのParallelizationFactorの設定をみて、フロントエンドの起動数をスケールします。
このときスケールするのはBatcherで、パーティションキー毎に割り振られるBatcherは一意に決まることで順序を維持しているようです。

[
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:17 +0000",
    "records": "1-1,4-1,1-2,4-2"
  },
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:11 +0000",
    "records": "3-1,3-2"
  },
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:17 +0000",
    "records": "2-1,5-1,2-2,5-2"
  },
  {
    "start": "2020-01-14 06:23:11 +0000",
    "end": "2020-01-14 06:23:29 +0000",
    "records": "3-3,3-4,3-5,3-6,3-7,3-8"
  },
  {
    "start": "2020-01-14 06:23:17 +0000",
    "end": "2020-01-14 06:24:23 +0000",
    "records": "2-3,5-3,2-4,5-4,2-5,5-5,2-6,5-6,2-7,5-7,2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13"
  },
  {
    "start": "2020-01-14 06:23:17 +0000",
    "end": "2020-01-14 06:24:23 +0000",
    "records": "1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7,1-8,4-8,1-9,4-9,1-10,4-10,1-11,4-11,1-12,4-12,1-13,4-13"
  },
  {
    "start": "2020-01-14 06:23:29 +0000",
    "end": "2020-01-14 06:24:17 +0000",
    "records": "3-9,3-10,3-11,3-12,3-13,3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24"
  },
  {
    "start": "2020-01-14 06:24:17 +0000",
    "end": "2020-01-14 06:24:35 +0000",
    "records": "3-25,3-26,3-27,3-28,3-29,3-30"
  },
  {
    "start": "2020-01-14 06:24:23 +0000",
    "end": "2020-01-14 06:26:05 +0000",
    "records": "2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
  },
  {
    "start": "2020-01-14 06:24:23 +0000",
    "end": "2020-01-14 06:26:05 +0000",
    "records": "1-14,4-14,1-15,4-15,1-16,4-16,1-17,4-17,1-18,4-18,1-19,4-19,1-20,4-20,1-21,4-21,1-22,4-22,1-23,4-23,1-24,4-24,1-25,4-25,1-26,4-26,1-27,4-27,1-28,4-28,1-29,4-29,1-30,4-30"
  }
]

呼び出しのタイミングのイメージです。

エラーハンドリング

並列化した場合も、パーティションキー毎に担当されるBatcherは固定されていることがわかりました。では、あるレコードでエラーが発生した場合に、同じBatcherに割り振られた他のパーティションキーのレコードの処理はどうなるのでしょうか。
意図的にパーティションキー 1 のデータ 5 の処理でエラー終了するように関数を変更して実行しました。

[
  {
    "id": "dd0433d1-d3ea-45ad-b90f-38571fae3682@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:27 +0000",
    "records": "2-1,5-1"
  },
  {
    "id": "c50dd83a-6f7e-4bb0-99dd-373f3514d41a@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:24 +0000",
    "records": "3-1"
  },
  {
    "id": "22bbbc6c-393a-4cb4-9270-a32fab209970@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:27 +0000",
    "records": "1-1,4-1"
  },

  ()

  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:30:27 +0000",
    "end": "2020-01-15 06:31:03 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },

  ()

  {
    "aws_request_id": "a966ff27-8569-4fbf-8594-adcf7947683f",
    "start": "2020-01-15 06:31:00 +0000",
    "end": "2020-01-15 06:31:51 +0000",
    "records": "3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24,3-25,3-26,3-27,3-28,3-29,3-30"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:31:03 +0000",
    "end": "2020-01-15 06:31:39 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "c43dbc93-16b1-4666-b565-2df99f7f68eb",
    "start": "2020-01-15 06:31:03 +0000",
    "end": "2020-01-15 06:33:21 +0000",
    "records": "2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13,2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:31:40 +0000",
    "end": "2020-01-15 06:32:16 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:32:17 +0000",
    "end": "2020-01-15 06:32:53 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:32:55 +0000",
    "end": "2020-01-15 06:33:31 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:33:34 +0000",
    "end": "2020-01-15 06:34:10 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:34:17 +0000",
    "end": "2020-01-15 06:34:53 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  }
]

結果、1-5 レコードを含むLambdaの呼び出し 1ec49225-cb26-444f-b2a4-a7c7bc2eb71f はエラーとなり、他のBatcherの呼び出しが完了した後もリトライを繰り返しています。
そして、パーティションキー1, 4のレコードはこのバッチ移行処理が進まなくなり、IteratorAgeはどんどん増加しています。これは一番避けるべき事態です。

対策として、Lambdaのイベントソースマッピングのストリームソースのオプションには、エラーのリカバリのためのサポートがあるので、これを利用すべきです。

BisectBatchOnFunctionError

今回のエラーは意図的に発生させているため回復の見込みがないですが、タイムアウトの場合、処理対象のデータを減らすことは有効です。
BisectBatchOnFunctionError を有効にすると、エラーになったバッチを2分割してLambdaの呼び出しが行われるので、制限時間内に処理を完了できる可能性が高められます。

MaximumRecordAgeInSeconds, MaximumRetryAttempts と DestinationConfig

エラーによるリトライは MaximumRecordAgeInSeconds または MaximumRetryAttempts を満了するまで続きます。
1ec49225-cb26-444f-b2a4-a7c7bc2eb71f の呼び出し時刻を見ると、徐々に前回の終了時刻からのラグが増えていることがわかります。
これはエクスポネンシャルバックオフが採用されているためで、連続するエラーのリトライ回数を緩和するためです。
これらの係数を小さくすることは、リトライアウトを早めますが、ストリームのスループット維持の妨げにならない値に調整する必要があります。
そして仮にリトライアウトが発生しても、 DestinationConfig でSQSのデッドレターキューを指定することができるので、リトライでリカバリ出来ないレコードは、ストリーム処理の枠外でエラーを確認し、場合によってはリトライすることができます。

以上です。
これまでKinesisClientLibraryを使っていましたが、自前で頑張る必要があった部分が、Lambdaでほぼ機能としてサポートされているので、Kinesisの処理はLambda一択になりそうです。