Pythonを使用したダイナモにおける正確なカウンタの実装

24204 ワード

あなたがブログを実行していると想像して、ビューの数などの記事のいくつかの統計情報を表示したい.DynamoDBはそのようなシステムのデータベースとして機能することができます.私はどのように、これらのカウンタの精度を向上させる上でいくつかのヒントを共有する方法を示します.私はここからあなたが得ることができるDynamoDB in 15 minutes ポスト.
DynamoDBはすべての種類のカウンタを作成するために使用することができます.オンザフライでそれらを計算することなく集計を得ることはしばしばモチベーションです.コンピューティング集約は高価で、スケールしない操作であるので、機能のためにネイティブの支持が全くありませんsum() or avg() DynamoDBではリレーショナルデータベースから知っているかもしれません.代わりに、テーブルに書き込むとき、またはDynamOdbストリームとラムダ関数を使用して非同期的に書き込みを行うときに集約を維持します.
私たちのブログを実行していると仮定し、どのように多くの回の各ブログ記事についての統計情報を作成したい.我々は非常に有意義ではありませんページの印象を数えることを避けるためにロードされた後、私たちのウェブサーバには、いくつかの秒後にブラウザからビューのイベントを送信します.私たちのバックエンドが受け取るビューイベントは、このように見えます.
{
    "url": "myblog.com/article1",
    "time": "2022-03-28T13:17:23+00:00",
    "clientId": "adidOIkenODSksi92LHd6"
}
今、我々はビューを収集するテーブル内の統計項目を作成することができます.私たちのテーブルBlogCardデータは、単一のテーブルのデザインでこのように見えるかもしれません.
京大理
sk
見解URL#myblog.com/article1 STATISTICS32URL#myblog.com/post23 STATISTICS64
ここでは、ビューカウンターを維持する関数の非常にナイーブで欠陥のある実装です.
def very_naive_view_counter(view_event: dict, table_name: str):
    table = boto3.resource("dynamodb").Table(table_name)

    blog_url = view_event["url"]

    # Increment the view counter by 1
    table.update_item(
        Key={
            "PK": f"URL#{blog_url}",
            "SK": "STATISTICS"
        },
        UpdateExpression="SET #views = #views + :increment",
        ExpressionAttributeNames={
            "#views": "views"
        },
        ExpressionAttributeValues={
            ":increment": 1
        }   
    )
一見したところ、問題はありませんが、ブログが初めて閲覧されると問題になりました.私たちだけが使用できますSET 属性が既に存在するならば、このようにしてください.それは最初の見解ではないので、何か他のことを考えなければならない.つのアプローチはADD UPDATE式では、カウンタをインクリメントし、属性が存在しない場合はゼロで開始します.The documentation discourages 使用ADD いくつかの未知の理由のためにSET 代わりに.幸いにも、我々は状況を改善するためにできるいくつかのトリックがあります.内蔵if_not_exists function 属性がまだ存在しない場合、初期値を設定できます.
def less_naive_view_counter(view_event: dict, table_name: str):
    table = boto3.resource("dynamodb").Table(table_name)

    blog_url = view_event["url"]

    # Increment the view counter by 1
    table.update_item(
        Key={
            "PK": f"URL#{blog_url}",
            "SK": "STATISTICS"
        },
        UpdateExpression="SET #views = if_not_exists(#views, :init) + :inc",
        ExpressionAttributeNames={
            "#views": "views"
        },
        ExpressionAttributeValues={
            ":inc": 1,
            ":init": 0
        }
    )
この実装は初期ビューを扱い、オリジナルのものに対して優れた実装です.各ビューイベントが1回だけ処理されることを保証できる限り、ソリューションは許容できます.我々が同じイベントを二回処理するならば、我々が我々のビューイベントの重複を全く持たないので、我々のカウンタは現実と同期しません.我々は、各ビューのイベントを正確に一度処理されると仮定することはできますか?それはあなたの建築次第です.ほとんどの標準的なサーバレス実装では、それはそうではありません.なぜ見ましょう.

上記のビューの統計システムの例を見ることができます.クライアントが送信するビューイベントを受け入れるAPIゲートウェイがあります.APIゲートウェイはイベントをSQS Queueに届けて、ステータスコードHTTP 201(受け入れられる)を返すので、クライアントはそのことをし続けることができて、バックエンドを待つ必要はありません.バックエンドのラムダ関数は、キューからのイベントを処理し、カウンタをDynamoDB内に保持します.ここで問題となるのがsqsとλの統合である.私たちは、イベントが少なくとも一回処理されるという保証を得るだけです.これはビューイベントが2回以上処理されることを意味します.この特定のアーキテクチャでは、最初のIn、First Out(FIFO)キューを選ぶこともできますが、システムのスループットを制限し、ボトルネックになる可能性があります.
代わりに、我々は我々が正常に処理したビューイベントを格納することによって、テーブル内の重複問題を解決することができます.これは私たちの元のテーブルのデザインを少し変更します.ここでは、ビューのイベント情報も含まれて改訂版です.
京大理
sk
見解URL#myblog.com/article1 STATISTICS3URL#myblog.com/article1 T#2022-03-28T13:17:23+00:00#CID#adidOIkenODSksi92LHd6 URL#myblog.com/article1 T#2022-03-28T13:17:38+00:00#CID#kdajIkenODSksiasde36 URL#myblog.com/article1 T#2022-03-28T14:36:23+00:00#CID#adsdfgIkenODSkggd6 URL#myblog.com/post23 STATISTICS1URL#myblog.com/post23 T#2022-03-28T14:36:23+00:00#CID#adsdfgIkenODSkggd6あなたのイベントをユニークにすることは、ここで重要なデザイン考慮です.イベント全体でハッシュを計算することができ、スキーマでソートキーとして使用することができます.私の場合、もっと暗黙のうちにそのようなことをすることにしました.値の組み合わせurl , time , and clientId 一意にビューイベントを識別します.すべてはアイテムの主キーに存在しています.私はここでハッシュ計算を選ぶことができませんでした、このようにテーブルのデータをレイアウトするので、私は特定の時間枠の中ですべての視点イベントを問い合わせることができます.
私がここで使用することができたもう一つのアプローチはclientId ソートキーとして.これは、同じclientidでビューを1回だけカウントされます.私は特にそれをしなかったので、誰かが異なる日付で2、3回ブログを読むことに決めるならば、私は別々の見解としてそれらを数えたいです.タイムスタンプの日付部分だけを選ぶことができましたclientId 私の統計の1日あたりのクライアントに1つのビューの数を制限する-多くのオプション.あなたにとってどれが最善かはユースケースによる.
ここで、TimestampとClientIDのビューイベントがテーブルにまだ存在しない場合、我々はビューカウンタをインクリメントするだけであることを確認する必要があります.私たちは、条件イベントのために条件付きのputを通してそれをすることができて、条件付きのputが働いたならば、カウンタを更新するだけであることができました、しかし、それはレース条件の対象となります.The transactions API ここでははるかに良い解決策です.トランザクションは、すべてまたは何の方法で変更を実行することができますACID ). これは、グループ内のすべての変更が正常に適用されたことを意味します.次の実装では、イベントの条件付きPUTリクエストをカウンタ増分と組み合わせるためにこれを使用します.これは、既にビューイベントを処理している場合、カウンタを更新しません.
def accurate_view_counter(view_event: dict, table_name: str):

    # transactions are only supported using the client API
    client = boto3.client("dynamodb")

    partition_key = f"URL#{view_event['url']}"
    sort_key_stats = "STATISTICS"
    sort_key_event = f"T#{view_event['time']}#CID#{view_event['clientId']}"

    try:
        client.transact_write_items(
            TransactItems=[
                {
                    "Put": {
                        "TableName": table_name,
                        "Item": {
                            "PK": {"S": partition_key},
                            "SK": {"S": sort_key_event}
                        },
                        "ConditionExpression": "attribute_not_exists(PK) and attribute_not_exists(SK)"
                    }
                },
                {
                    "Update": {
                        "TableName": table_name,
                        "Key": {
                            "PK": {"S": partition_key},
                            "SK": {"S": sort_key_stats}
                        },
                        "UpdateExpression": "SET #views = if_not_exists(#views, :init) + :inc",
                        "ExpressionAttributeNames": {
                            "#views": "views"
                        },
                        "ExpressionAttributeValues": {
                            ":init": {"N": "0"},
                            ":inc": {"N": "1"}
                        }
                    }
                }
            ]
        )

    except ClientError as err:
        if err.response["Error"]["Code"] == 'TransactionCanceledException':
            # Already processed
            print("View event was already processed")
        else:
            raise err
このアプローチは1つだけビューのイベントを1回ビューイベントを更新します.しかし、コストがかかります.我々は、ストレージのコストを発生させ、我々のナイーブ実装よりも多くの読み取りと書き込み容量単位を消費するトランザクションAPIを使用して、テーブル内のビューイベントを格納しています.
トランザクションAPIのコストについてはあまりできません.これは、正確なカウンタを持っているだけのコストです.あなたが精度のそのレベルを必要としないならば、以前の実現のうちの1つは十分であるかもしれません.幸いにも、我々のコストのストレージコンポーネントを最適化することができます.一般的に重複イベントは、比較的短い期間、例えば数時間以内に到着します.その後、再び特定のイベントを見ることは不可能です.つまり、私たちはいつか、例えば1週間後にテーブルから我々のビュー・イベントを期限切れにして、全体的なストレージコストを減らすことができることを意味します.
以下は、time-to-live いくつかの時間後に表示イベントを期限切れにするためにDynamoDBに組み込まれている機能.彼らは正確にこの時点ではなく、その時点のその約24時間以内に期限切れにしないことに注意してください.
def accurate_view_counter_with_ttl(view_event: dict, table_name: str):

    # transactions are only supported using the client API
    client = boto3.client("dynamodb")

    expire_after_seconds = 60 * 60 * 24 * 7 # a week
    current_time_as_epoch = int(time.time())
    expiry_time = current_time_as_epoch + expire_after_seconds


    partition_key = f"URL#{view_event['url']}"
    sort_key_stats = "STATISTICS"
    sort_key_event = f"T#{view_event['time']}#CID#{view_event['clientId']}"

    try:
        client.transact_write_items(
            TransactItems=[
                {
                    "Put": {
                        "TableName": table_name,
                        "Item": {
                            "PK": {"S": partition_key},
                            "SK": {"S": sort_key_event},
                            "ttl": {"N": str(expiry_time)}
                        },
                        "ConditionExpression": "attribute_not_exists(PK) and attribute_not_exists(SK)"
                    }
                },
                {
                    "Update": {
                        "TableName": table_name,
                        "Key": {
                            "PK": {"S": partition_key},
                            "SK": {"S": sort_key_stats}
                        },
                        "UpdateExpression": "SET #views = if_not_exists(#views, :init) + :inc",
                        "ExpressionAttributeNames": {
                            "#views": "views"
                        },
                        "ExpressionAttributeValues": {
                            ":init": {"N": "0"},
                            ":inc": {"N": "1"}
                        }
                    }
                }
            ]
        )

    except ClientError as err:
        if err.response["Error"]["Code"] == 'TransactionCanceledException':
            # Already processed
            print("View event was already processed")
        else:
            raise err

概要


このポストでは、我々はDynamoDBのカウンタをインプリメントして、トランザクションAPIについて学んで、イベントのユニークさに関するデザイン考慮に深いダイビングをしました.あなたは、私がここで言及した完全なコードを見つけることができますthis Github repository .
うまくいけば、あなたはこのブログから何かを得ました.
-モリス