Dynamodbストリームへの深いダイブとラムダ積分

11322 ワード

あなたはおそらく以前このアーキテクチャを見たことがあります.DynamoDBテーブルには、テーブル内のデータが変更されるたびにトリガされるデータとラムダ関数が保持されます.それはほとんどServerlessなアーキテクチャで遍在しています、そして、日常のユースケースの1つはテーブルで集合アイテムを維持することです.今日、我々はより深くこの極度に些細なアーキテクチャに飛び込みます、そして、どのようにDynamoBストリームが動くかについて調査してください、そして、ラムダサービスが何を統合においてするか、そして、あなたはセットアップを微調整するために利用できるノブを持ちます.
ストレージパーティションは、DynamoDBテーブルを構成し、それぞれが10 GBのデータを保持し、一定の量のリードライトスループットを提供することができます.テーブルの各項目には、少なくともパーティションキーがあります.このパーティションキーは、項目の格納場所が格納されているハッシュ関数を使用します.プライマリキーは、各項目は、テーブルの設定に応じて2つの品種で来ることを識別します.これは、パーティションキーまたはパーティションキーとソートキーからなる複合主キーに相当する単純な主キーである.後者は同じパーティションキーを共有する項目をソートするために使用されます.

DynamoDBストリームは、データをテーブルでキャプチャする方法です.つまり、アイテムへの変更は、従来のリレーショナルデータベースからのトランザクションログとは異なり、ストリームに書き込まれます.アプリケーションは、データの変更に応答する変更のこのストリームを消費することができます.アプリケーションは、ストリーム内の任意の連続して変更されたレコードを消費し、どこかで読み取りを開始します.これは、テーブルが単一のストレージパーティションで構成されている限り、とても簡単です.実際には、それはめったに例ではない.
DynamoDBは十分な容量が利用できるまでパーティションの数を増やすことによって、ストレージまたはスループット要件のレベルに比例します.それは複雑さを増す.何百ものテーブルやこれらのストレージパーティションの何千ものテーブルを持っているのは珍しいことではない.これはストリームコンポーネントがあまりにもスケールアウトすることができなければならないことを意味します.それは、shardingの使用によってそれをします.ストリームのシャードは、テーブルのストレージパーティションに似ています.各ストレージパーティションには少なくとも1つのシャードがあります.シャードは24時間の特定のストレージパーティションの変更ストリームを格納します.
我々がDynamoDBストリームから読むことについて話すときはいつでも、我々は流れを構成するシャードからの読書について話しています.これらの破片の数は静的ではない.これは、テーブル内で何が起こるかによって時間をかけて増減できます.新しいストレージパーティションは、シャードの数を増やします.多くの読み書きは、ストレージパーティションに書き込むことができます.シャドーも自然に記入し、後継者を得ることができます.

この透明を作るために、シャードの系統を追跡します.それぞれのシャードは親のひげを持つことができます.ストリームからレコードを処理するときは、子プロセスがイベントの順序を保持する前に親プロセスを処理する必要があります.The DescribeStream API 現在のシャードのリストを取得するために使用することができます.以下に例を示します.
$ aws dynamodbstreams describe-stream --stream-arn <stream-arn>
{
    "StreamDescription": {
        "StreamArn": "arn:aws:dynamodb:eu-central-1:123123123123:table/pk-only/stream/2022-03-19T13:37:17.440",
        "StreamLabel": "2022-03-19T13:37:17.440",
        "StreamStatus": "ENABLED",
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "CreationRequestDateTime": "2022-03-19T14:37:17.433000+01:00",
        "TableName": "pk-only",
        "KeySchema": [
            {
                "AttributeName": "PK",
                "KeyType": "HASH"
            }
        ],
        "Shards": [
            {
                "ShardId": "shardId-00000001647710671110-c100053c",
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "1624739500000000027768819660",
                    "EndingSequenceNumber": "1624739500000000027768819660"
                },
                "ParentShardId": "shardId-00000001647697037779-1c062810"
            },
            {
                "ShardId": "shardId-00000001647796385437-c1637cb4",
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "1629451500000000049791965989"
                },
                "ParentShardId": "shardId-00000001647780605726-c21455bb"
            }
        ]
    }
}
シャードからレコードを要求するためにシャードイテレータを必要とし、GetShardIterator API . このshardイテレータは、変更の流れのポインタとして働きます.このポインタがSHARDイテレータ型で最初に指す場所を定義できます.設定TRIM_HORIZON それは、シャードで利用できる最も古い記録から始めます.選択LATEST 直近のものを返し、AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER あなたが正確にどこからの読み取りを開始するかを指定することができますSequenceNumber パラメータこれらのイテレータがおよそ15分後に期限切れになるので、その時間枠の中でレコードを要求する必要があることに注意してください.

$ aws dynamodbstreams get-shard-iterator --stream-arn <stream-arn> --shard-id <shard-id> --shard-iterator-type TRIM_HORIZON --no-cli-pager
{
    "ShardIterator": "arn:aws:dynamodb:eu-central-1:123123123123:table/pk-only/stream/2022-03-19T13:37:17.440|2|AAAAAAAAAAIuaQ...HjeCPZPrVe6Bk1Hsa6PrTQ=="
}
我々はシャードイテレータを持っているので、我々は最終的に我々のシャードからレコードを要求することができます.適切な名前GetRecords API それを正確に行うために存在する.我々はそれを我々のシャードイテレータを通過し、それはシャドウと新しいshardイテレータの項目を返します.私の例ではシャードには記録がありませんでしたが、今では変更記録を処理する時間です.
$ aws dynamodbstreams get-records --shard-iterator "arn:aws:dy..."
{
    "Records": [],
    "NextShardIterator": "arn:aws:dynamodb:eu-central-1:123123123123:table/pk-only/stream/2022-03-19T13:37:17.440|2|AAAAAAAAAAIjO5vOr8skH8j49FiK34qRqbSjjwjSa07EgMz1qxOsmDE3W5CrFVKz1UkQXdfkYIxTH2tjfvUkyP0hNQgDqsLoP...bUboSb1y1k6S2Y"
}
要約すると、カスタムアプリケーションのDynamodbストリームからデータを処理したいときに管理する必要があります.我々は、シャードとシャード族の数の変化に適応する必要があります.また、我々は24時間以内にすべての変更を読むことを確認する必要があります.そうでなければ、データを失うでしょう.それはたくさんの仕事だ.どのような保証を得るのですか?ドキュメントは非常に迅速に対応できるように変更データは、ほぼリアルタイムでshardsで利用できるようになります言及.また、ストリームは特定の項目への変更が順番にストリームに書き込まれることを保証します-しかし、私たちは、子シャードの前に親を最初に読むようにする必要があります.最後に、少なくとも、各レコードは、ストリーム内に一度だけ表示されます.この最後の部分は良いですが、私たちはすぐにそれがラムダでより複雑であることがわかります.
我々が合併症について話す前に、ラムダとダイナモストリームの間の統合がどのように働くかを見ましょう.統合を使用するには、前述のAPIアクションのパーミッションをラムダ関数の実行ロールに与える必要があります.ラムダサービス(コードではなく、サービス)は、自動的にストリームのシャードを一覧表示します.各シャードのためのプロセスは、シャードイテレータを使用して1秒あたり4回レコードのシャードをポーリングします.親プロセスが最初に処理されるように、このプロセスは血統の面倒を見ます.レコードが見つかった場合は、ラムダ関数を同期的に呼び出し、応答に応じて再度何かポーリングしたり、レコードのバッチを再試行します.

私たちは、私が言及したより、このプロセスをもう少しコントロールしています.ストリーム処理をどこでログに開始するかを設定することができますTRIM_HORIZON or LATEST, そして今、あなたはそれが何であるか知っている.また、バッティングを制御できます.ラムダに、我々の機能を最大限に呼び出すことができますn レコードの設定BatchSize . The MaximumBatchingWindowInSeconds までのイベントを集めるのに長い間ラムダに伝えますBatchSize それがコードを呼び出すまで.これは小さなバッチでコードを呼び出すのを避ける方法です.

もう一つはParallelizationFactor これにより、10桁までの実行コンテキストの数を増やすことができます.テーブルに非常に高いスループットがあれば便利です.ラムダは、レコードレベルがアイテムレベルで保存されることを保証します.ネイティブストリームとは対照的に、あなたは正確に一度配達に反対する少なくとも一度配達の保証を得る.これは、実行中にエラーが発生した場合、ラムダがバッチを再試行するためです.デフォルトでは、バッチ全体を再試行しますが、BisectBatchOnFunctionError がtrueに設定されています.これはエラーを引き起こすレコードを分離するのに役立ちます.使用MaximumRetryAttempts and MaximumRecordAgeInSeconds あなたが誤ったバッチのために実行される方法を再試行することができます.
どのバッチサイズと並列化要因を選ぶ必要がありますか?それは普通通りです.バッチサイズは、ラムダ呼び出しによって得ることができるどのように多くのレコードまでコントロールします.あなたのコードが一度に1つの変化記録を扱うだけに書かれるならば、それを1にセットしてください.それ以外の場合は、ラムダタイムアウト内の変更レコードを処理できる場合は、最大10まで増加できます.並列化要因に関して、あなたが話しているどちらのバックエンドシステムに悪影響もないならば、あなたはできるだけ高くそれをセットすることができます.ラムダは、アイテムごとの変更の順序が保存されることを保証します.高い並列化係数は、変化に対する応答のための平均時間を減らすことができる.
これらのパラメータを調整すると、Serverlessアプリケーションでの変更データ処理を最適化できます.これは我々が一日と呼ぶところです.私たちはどのようにDynamoDBストリームのフードの下で動作し、どのようにラムダサービスは、データの変更に応答するためにそれらを統合する方法をカバーしてきた.これまで読んでくれてありがとう.うまくいけば、私はそれを書いて楽しんで、このポストを読んで楽しんだ.あなたがDynamoDBを使用することについての詳細に興味があるならば、私の導入をチェックしてくださいDynamoDB in 15 Minutes または、私のポストはNOSQLデータベース設計のプロセスを概説しますmodeling a product catalog in DynamoDB . もっとニッチなトピックについては、ガイドがありますworking with lists in DynamoDB そして、ポストimplementing optimistic locking in DynamoDB with Python .
うまくいけば、あなたは私の仕事を読んで楽しんだ.私は、質問、フィードバック、および懸念を楽しみにしています.
参考文献
  • AWS Docs on DynamoDB Streams
  • AWS Blog explaining different integration patterns with Streams
  • Talk from re:invent 2021 explaining how streams fit into serverless architectures
  • AWS Docs on the DynamoDB-Lambda integration