ParallelizationFactorによる並列化でKinesisのレコードはLambdaにどう渡されるのか
Lambdaはこのアップデートで、ParallelizationFactor
がサポートされ、ストリームソースの1つのシャードを複数のLambda呼び出しで並列処理できるようになりました。
並列化した場合に関数にはどのようにレコードが渡ってくるのか、気になる点を確認しました。
ParallelizationFactor の意義
Kinesis Data Streamsではシャード毎にレコードに振ったシーケンス番号でレコードの入力と出力の順序を保証しているため、コンシューマで、あるレコードの処理が完了せず停滞した場合、次以降のレコードはその分遅延します。
この遅延量は、Kinesisでは GetRecords.IteratorAgeMilliseconds
メトリクスで1、Lambda側では IteratorAge
メトリクス2で監視でき、遅延測定のポイントが違いますが、Kinesisを使う時は両方のメトリクスの増加に気をつける必要があります。
遅延はストリームのデータの供給速度より消費速度が遅ければ、徐々に蓄積していくことになります。
これまで、Kinesis Client LibraryでもLambdaでも、基本的に1つのシャードを1つのコンシューマが担当してシリアルに処理するため、シャード数を増やして遅延対策をする他ありませんでした。
しかし、Lambdaに ParallelizationFactor
オプションが追加され、1つのシャードに対して、Lambda呼び出しを1〜10並列実行でき、消費側のスループットだけをスケールさせられるようになりました。
これは IteratorAge が高くなりがちなワークロードに対して非常に有効な対策となります。
実験環境
下記のように、コンシューマ側で遅延を生じる環境を作成して、DynamoDBに記録された処理結果と、CloudWatchのメトリクスを見ていきます。
- Kinesis Data Streams
- シャード数: 1
- Producer
- producer/put_records.rb をPCで実行する
- put_records.rb
-
1
〜5
をパーティションキーとして、各パーティションキーごとに1始まりの整数をデータとするレコードを1秒毎に30回送信する
- Consumer
- consumer/lambda_function.rb をトリガをKinesisとして、Lambdaで実行する
- lambda_function.rb
- 1呼び出しごとに、DynamoDBに受信したレコードと処理時刻を記録する
- 1レコードの処理ごとに3秒スリープする
環境構築はこちらのterraformで行いました。
並列なし(ParallelizationFactor = 1)の場合
まず、並列なしで実行した結果です。
以下は関数から処理開始時刻、終了時刻、イベントで渡されたレコードの {パーティションキー}-{データ番号}
をDynamoDBに記録したものと、CloudWatchのメトリクスです。
呼び出しは計4回シリアルに行われ、最後の呼び出し時点ではLambdaのIteratorAgeは300秒の遅延となり、トータル450秒かかりました。
$ aws dynamodb scan --table-name $TABLENAME | jq '.Items | map({start: .start.S, end: .end.S, records: [.records.L[].S] | join(",")}) | sort_by(.start)'
[
{
"start": "2020-01-14 06:01:10 +0000",
"end": "2020-01-14 06:01:25 +0000",
"records": "1-1,2-1,3-1,4-1,5-1"
},
{
"start": "2020-01-14 06:01:25 +0000",
"end": "2020-01-14 06:03:55 +0000",
"records": "1-2,2-2,3-2,4-2,5-2,1-3,2-3,3-3,4-3,5-3,1-4,2-4,3-4,4-4,5-4,1-5,2-5,3-5,4-5,5-5,1-6,2-6,3-6,4-6,5-6,1-7,2-7,3-7,4-7,5-7,1-8,2-8,3-8,4-8,5-8,1-9,2-9,3-9,4-9,5-9,1-10,2-10,3-10,4-10,5-10,1-11,2-11,3-11,4-11,5-11"
},
{
"start": "2020-01-14 06:03:55 +0000",
"end": "2020-01-14 06:06:25 +0000",
"records": "1-12,2-12,3-12,4-12,5-12,1-13,2-13,3-13,4-13,5-13,1-14,2-14,3-14,4-14,5-14,1-15,2-15,3-15,4-15,5-15,1-16,2-16,3-16,4-16,5-16,1-17,2-17,3-17,4-17,5-17,1-18,2-18,3-18,4-18,5-18,1-19,2-19,3-19,4-19,5-19,1-20,2-20,3-20,4-20,5-20,1-21,2-21,3-21,4-21,5-21"
},
{
"start": "2020-01-14 06:06:25 +0000",
"end": "2020-01-14 06:08:40 +0000",
"records": "1-22,2-22,3-22,4-22,5-22,1-23,2-23,3-23,4-23,5-23,1-24,2-24,3-24,4-24,5-24,1-25,2-25,3-25,4-25,5-25,1-26,2-26,3-26,4-26,5-26,1-27,2-27,3-27,4-27,5-27,1-28,2-28,3-28,4-28,5-28,1-29,2-29,3-29,4-29,5-29,1-30,2-30,3-30,4-30,5-30"
}
]
re:invent2019の A serverless journey: AWS Lambda under the hood (SVS405-R1)で説明されていますが、Lambdaのストリームソースの処理の内部では、Stream Tracker
が、Leasing Service
を通じて、起動しているシャード数に従って Poller
をアレンジし、Poller
がシャードをサブスクライブをしてGetRecordsを行っています。そしてLambdaの関数はPollerから呼び出されます。
ここで、Kinesisの GetRecords.IteratorAgeMilliSeconds
に遅延がないのは、PollerのGetRecordsの実行には遅延がないためです。
GetRecords.IteratorAgeMilliSeconds
は、タイムアウトなどLambdaの呼び出しがエラーになったことで、GetRecordsが遡って行われる場合に増加します。
ParallelizationFactor = 3の場合
次にParallelizationFactorを3にして実行します。変更はコンソールか、CLIで下記のように設定できます。
$ aws lambda update-event-source-mapping --uuid $TRIGGER_UUID --parallelization-factor 3
結果はこのようになりました。
Lambdaの呼び出しは計10回で、トータルは180秒で完了しました。IteratorAgeの最大も67秒に短縮できています。
各呼び出しで渡されたレコードを見ると、パーティションキー 1, 4 のレコード、3のレコード、2, 5のレコードのグループに分けられています。
そして呼び出しのタイムスタンプを見ると、各グループの処理は並列に呼び出されているけれども、グループごとにシリアルに実行されています。
つまり、並列化してもパーティションキー毎のレコード順序は保証されます。
こちらでも説明されていますが、PollerはLambdaのParallelizationFactorの設定をみて、フロントエンドの起動数をスケールします。
このときスケールするのはBatcherで、パーティションキー毎に割り振られるBatcherは一意に決まることで順序を維持しているようです。
[
{
"start": "2020-01-14 06:23:05 +0000",
"end": "2020-01-14 06:23:17 +0000",
"records": "1-1,4-1,1-2,4-2"
},
{
"start": "2020-01-14 06:23:05 +0000",
"end": "2020-01-14 06:23:11 +0000",
"records": "3-1,3-2"
},
{
"start": "2020-01-14 06:23:05 +0000",
"end": "2020-01-14 06:23:17 +0000",
"records": "2-1,5-1,2-2,5-2"
},
{
"start": "2020-01-14 06:23:11 +0000",
"end": "2020-01-14 06:23:29 +0000",
"records": "3-3,3-4,3-5,3-6,3-7,3-8"
},
{
"start": "2020-01-14 06:23:17 +0000",
"end": "2020-01-14 06:24:23 +0000",
"records": "2-3,5-3,2-4,5-4,2-5,5-5,2-6,5-6,2-7,5-7,2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13"
},
{
"start": "2020-01-14 06:23:17 +0000",
"end": "2020-01-14 06:24:23 +0000",
"records": "1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7,1-8,4-8,1-9,4-9,1-10,4-10,1-11,4-11,1-12,4-12,1-13,4-13"
},
{
"start": "2020-01-14 06:23:29 +0000",
"end": "2020-01-14 06:24:17 +0000",
"records": "3-9,3-10,3-11,3-12,3-13,3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24"
},
{
"start": "2020-01-14 06:24:17 +0000",
"end": "2020-01-14 06:24:35 +0000",
"records": "3-25,3-26,3-27,3-28,3-29,3-30"
},
{
"start": "2020-01-14 06:24:23 +0000",
"end": "2020-01-14 06:26:05 +0000",
"records": "2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
},
{
"start": "2020-01-14 06:24:23 +0000",
"end": "2020-01-14 06:26:05 +0000",
"records": "1-14,4-14,1-15,4-15,1-16,4-16,1-17,4-17,1-18,4-18,1-19,4-19,1-20,4-20,1-21,4-21,1-22,4-22,1-23,4-23,1-24,4-24,1-25,4-25,1-26,4-26,1-27,4-27,1-28,4-28,1-29,4-29,1-30,4-30"
}
]
呼び出しのタイミングのイメージです。
エラーハンドリング
並列化した場合も、パーティションキー毎に担当されるBatcherは固定されていることがわかりました。では、あるレコードでエラーが発生した場合に、同じBatcherに割り振られた他のパーティションキーのレコードの処理はどうなるのでしょうか。
意図的にパーティションキー 1
のデータ 5
の処理でエラー終了するように関数を変更して実行しました。
[
{
"id": "dd0433d1-d3ea-45ad-b90f-38571fae3682@2020-01-15 06:30:21 +0000",
"start": "2020-01-15 06:30:21 +0000",
"end": "2020-01-15 06:30:27 +0000",
"records": "2-1,5-1"
},
{
"id": "c50dd83a-6f7e-4bb0-99dd-373f3514d41a@2020-01-15 06:30:21 +0000",
"start": "2020-01-15 06:30:21 +0000",
"end": "2020-01-15 06:30:24 +0000",
"records": "3-1"
},
{
"id": "22bbbc6c-393a-4cb4-9270-a32fab209970@2020-01-15 06:30:21 +0000",
"start": "2020-01-15 06:30:21 +0000",
"end": "2020-01-15 06:30:27 +0000",
"records": "1-1,4-1"
},
(略)
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:30:27 +0000",
"end": "2020-01-15 06:31:03 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
(略)
{
"aws_request_id": "a966ff27-8569-4fbf-8594-adcf7947683f",
"start": "2020-01-15 06:31:00 +0000",
"end": "2020-01-15 06:31:51 +0000",
"records": "3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24,3-25,3-26,3-27,3-28,3-29,3-30"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:31:03 +0000",
"end": "2020-01-15 06:31:39 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
{
"aws_request_id": "c43dbc93-16b1-4666-b565-2df99f7f68eb",
"start": "2020-01-15 06:31:03 +0000",
"end": "2020-01-15 06:33:21 +0000",
"records": "2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13,2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:31:40 +0000",
"end": "2020-01-15 06:32:16 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:32:17 +0000",
"end": "2020-01-15 06:32:53 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:32:55 +0000",
"end": "2020-01-15 06:33:31 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:33:34 +0000",
"end": "2020-01-15 06:34:10 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
},
{
"aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
"start": "2020-01-15 06:34:17 +0000",
"end": "2020-01-15 06:34:53 +0000",
"records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
}
]
結果、1-5
レコードを含むLambdaの呼び出し 1ec49225-cb26-444f-b2a4-a7c7bc2eb71f
はエラーとなり、他のBatcherの呼び出しが完了した後もリトライを繰り返しています。
そして、パーティションキー1, 4のレコードはこのバッチ移行処理が進まなくなり、IteratorAgeはどんどん増加しています。これは一番避けるべき事態です。
対策として、Lambdaのイベントソースマッピングのストリームソースのオプションには、エラーのリカバリのためのサポートがあるので、これを利用すべきです。
BisectBatchOnFunctionError
今回のエラーは意図的に発生させているため回復の見込みがないですが、タイムアウトの場合、処理対象のデータを減らすことは有効です。
BisectBatchOnFunctionError
を有効にすると、エラーになったバッチを2分割してLambdaの呼び出しが行われるので、制限時間内に処理を完了できる可能性が高められます。
MaximumRecordAgeInSeconds, MaximumRetryAttempts と DestinationConfig
エラーによるリトライは MaximumRecordAgeInSeconds
または MaximumRetryAttempts
を満了するまで続きます。
1ec49225-cb26-444f-b2a4-a7c7bc2eb71f
の呼び出し時刻を見ると、徐々に前回の終了時刻からのラグが増えていることがわかります。
これはエクスポネンシャルバックオフが採用されているためで、連続するエラーのリトライ回数を緩和するためです。
これらの係数を小さくすることは、リトライアウトを早めますが、ストリームのスループット維持の妨げにならない値に調整する必要があります。
そして仮にリトライアウトが発生しても、 DestinationConfig
でSQSのデッドレターキューを指定することができるので、リトライでリカバリ出来ないレコードは、ストリーム処理の枠外でエラーを確認し、場合によってはリトライすることができます。
以上です。
これまでKinesisClientLibraryを使っていましたが、自前で頑張る必要があった部分が、Lambdaでほぼ機能としてサポートされているので、Kinesisの処理はLambda一択になりそうです。
Author And Source
この問題について(ParallelizationFactorによる並列化でKinesisのレコードはLambdaにどう渡されるのか), 我々は、より多くの情報をここで見つけました https://qiita.com/sabmeua/items/7fec5b8dd3415c5733cb著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .