🍬 AWS CDK 101🔬 ラムダを直接呼び出す機能をバッファするキューの追加


🔰 AWS CDKに初心者は、このシリーズで私の前の記事を1つずつ見てください.
場合は、私の前の記事を逃した場合は、以下のリンクでそれを見つけるか.
🔁 前のポスト🔗 Dev Post
🔁 前投稿を転載🔗
🔁 前投稿を転載🔗 medium com @aravindvcyber
この記事では、バッチサイズ制限で間接的にラムダをトリガーする同様のキューを追加することで、ラムダ関数を直接呼び出す前の機能を再評価しましょう.これは、我々の前の記事のようなラムダ並行性でより良い最適化を達成するのを助けることができました.scalable-event-driven-processing-using-eventbridge-and-sqs

潮流☘️



提案流量🍀



このアプローチで得られる利益🚣‍♀️


このアプローチに関連する複数の利点が次のとおりです.
  • 我々は、標準のSQS待ち行列から間接的な/バッファされた呼び出しを引き起こすことによって、我々のラムダの直接の呼び出しをステップ関数によって切り離すことができます.
  • このアプローチにより、ラムダトリガをバッティングすることによりラムダの同時実行限界を最小にすることができます.
  • また、1つのことを覚えているなら、私たちのラムダはDynamoDB書き込み操作をすることになっています.このアプローチにより、テーブル内の一貫した書き込み単位を維持し、利用することができます.
  • これの他に、我々はより高い環境でこれを簡単にスケールすることができます.
  • また、デバッグや検査のためにデッドレターキューを使用することもできます.
  • 以前の構文で使用されているキューの洗練🔖


    以前の構造を編集しましょうconstruct/sfn-simple.ts デッドレターキューとワーキングキューの定義を次のように追加します.

    デッドレター🎾


    デッドレターキューは失敗したメッセージを保持する必要があるので、作業キューは常に処理されるレコードを持っています.
     const sfnCommonEventProcessorQueueDLQ: DeadLetterQueue = {
          queue: new Queue(this, "sfnCommonEventProcessorQueueDLQ", {
            retentionPeriod: Duration.days(14),
            removalPolicy: RemovalPolicy.DESTROY,
            queueName: "sfnCommonEventProcessorQueueDLQ",
          }),
          maxReceiveCount: 100,
    };
    

    標準待ち行列🏊


    標準キューは、以下に示すように、コストおよびビジネス要件に基づいてFIFOキューを使用することもできます.
    
    const sfnCommonEventProcessorQueue = new Queue(
          this,
          "sfnCommonEventProcessorQueue",
          {
            retentionPeriod: Duration.days(5),
            removalPolicy: RemovalPolicy.DESTROY,
            deliveryDelay: Duration.seconds(3),
            queueName: "sfnCommonEventProcessorQueue",
            visibilityTimeout: Duration.minutes(100),
            deadLetterQueue: sfnCommonEventProcessorQueueDLQ,
          }
    );
    

    新しい仕事をキューにメッセージをプッシュする🎯


    我々が新しいラムダ呼び出し仕事を定義した方法と同じように、我々はメッセージを我々の既存のラムダで非同期に後でポーリングされる待ち行列にメッセージをプッシュする新しい仕事を定義することができます.
    また、ペイロード、inputpath、resultpath、resultSelectorなどを完全に再利用していることもわかりました.
    const recordingQueue = new tasks.SqsSendMessage(
          this,
          "Record using Queue",
          {
            inputPath: "$",
            messageBody: sfnTaskPayload,
            queue: sfnCommonEventProcessorQueue,
            comment: "Record message into dynamodb using SQS queue buffered lambda",
            integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
            resultSelector: {
              "Payload.$": "$",
              "StatusCode.$": "$.statusCode",
            },
            resultPath: "$.recordResult",
          }
    );
    
    
    

    既存のラムダ用のイベントソースの追加⛺


    今度は新しいキューを既存のラムダに接続します.
    キューのバッチサイズを設定していることがわかりますn 同時実行している状態関数の数.また、FIFO キューがメッセージの順序が非常に重要であるとわかるなら、記録して、処理してください.
    triggerFunction.addEventSource(
          new SqsEventSource(sfnCommonEventProcessorQueue, {
            batchSize: 2,
            maxBatchingWindow: Duration.seconds(10)
          })
        );
    
    ヒアbatchSize 呼び出しごとに取得するレコードの最大数maxBatchingWindow レコードにラムダをレコードに戻す前に待機する最大時間を定義します.定義されていない場合、ラムダはbatchSize .

    コールバックパターン👓


    何かに気づくと、コールバックパターンを実装しました.ここでは、メッセージをSQSにプッシュした後、ステップ関数の実行を一時停止します.
    このアプローチでは、実際の作業が起こっている場所を理解することができ、結果が準備されるまで、何度も何度も更新のためのポーリングを維持していない.

    コールバックパターンイラスト♦️



    コールバックパターンのイラストが成功する🎼



    λ関数の変化💎


    我々がした現在の変更は一見して十分に見えます、しかし、彼らは失敗します、これはペイロードラムダ電流取得が古い呼び出しと異なるので.これは、SQSがラムダによってポーリングされる間、実際のメッセージはイベントレコードの配列を含むオブジェクトの中にあるからです.そして、イベントレコードの中で、我々が前の記事でペイロードとして通過したとき、Bodyプロパティは実際のメッセージを与えています.確かに、ラムダを少しずつ調整する必要があります.
    {
        "Records": [
            {
                "messageId": "ff11de7f-795a-4c85-8612-9d376e01df0d",
                "receiptHandle": "**********",
                "body": "{\"Record\":{\"createdAt\":\"2022-04-18T07:51:06Z\",\"messageId\":\"538a8436-9987-1f65-478d-815c77f00d0f\",\"event\":{\"message\":\"A secret message\"}},\"MyTaskToken\":\"**********\"}",
                "attributes": {
                    "ApproximateReceiveCount": "3",
                    "SentTimestamp": "1650268267274",
                    "SenderId": "AROAYLZFFS6HZ3ESZEEBH",
                    "ApproximateFirstReceiveTimestamp": "1650268270274"
                },
                "messageAttributes": {},
                "md5OfBody": "edddae585d76eb14439f5804dcef24a4",
                "eventSource": "aws:sqs",
                "eventSourceARN": "**********",
                "awsRegion": "ap-south-1"
            }
        ]
    }
    

    ラムダコードの書き換え📝


    そこで、以下に示すようなメッセージレコーダラムダ関数を上記ペイロードの変更に注意して書きましょう.
    これは私たちに追加のステップを与えると思いますが、このステップは、ポーリングプロセス中にメッセージのレコードをバッチでラムダへの書き込みを処理するのに役立ちます.
    import { PutItemInput } from "aws-sdk/clients/dynamodb";
    import { DynamoDB, StepFunctions } from "aws-sdk";
    const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
    exports.processor = async function (event: any) {
      const dynamo = new DynamoDB();
      let result: any | undefined = undefined;
      await Promise.all(
        event.Records.map(async (Record: any) => {
    
          const msg = JSON.parse(Record.body).Record;
          const crt_time: number = new Date(msg.createdAt).getTime();
          const putData: PutItemInput = {
            TableName: process.env.MESSAGES_TABLE_NAME || "",
            Item: {
              messageId: { S: msg.messageId },
              createdAt: { N: `${crt_time}` },
              event: { S: JSON.stringify(msg.event) },
            },
            ReturnConsumedCapacity: "TOTAL",
          }; 
          try {
            result = await dynamo.putItem(putData).promise();
          } catch (err) {
            const sendFailure: StepFunctions.SendTaskFailureInput = {
              error: JSON.stringify(err),
              cause: JSON.stringify({
                statusCode: 500,
                headers: { "Content-Type": "text/json" },
                putStatus: {
                  messageId: msg.messageId,
                  ProcessorResult: err,
                },
              }),
              taskToken: JSON.parse(Record.body).MyTaskToken,
            };
            console.log(sendFailure);
            await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
              if (err) console.log(err, err.stack); 
              else console.log(data); 
            });
            return sendFailure;
          }
          const sendSuccess: StepFunctions.SendTaskSuccessInput = {
            output: JSON.stringify({
              statusCode: 200,
              headers: { "Content-Type": "text/json" },
              putStatus: {
                messageId: msg.messageId,
                ProcessorResult: result,
              },
            }),
            taskToken: JSON.parse(Record.body).MyTaskToken,
          };
    
          console.log(sendSuccess);
    
          await sfn
            .sendTaskSuccess(sendSuccess, function (err: any, data: any) {
              if (err) console.log(err, err.stack); 
              else console.log(data); 
            })
            .promise();
    
          return sendSuccess;
        })
      );
    };
    
    

    ラムダ変更の概要🍋


    exports.processor = async function (event: any) {
      const dynamo = new DynamoDB();
      let result: any | undefined = undefined;
      await Promise.all(
        event.Records.map(async (Record: any) => {  
          const msg = JSON.parse(Record.body).Record;
          /////rest the same like previous article
        })
      );
    };
    

    tasktokenの変更🏆


    その他taskToken また、前の例から下記の値までラムダの内側に変更されました.
    以前
    taskToken: Record.MyTaskToken,
    
    アフター
    taskToken: JSON.parse(Record.body).MyTaskToken,
    

    関数型タイムアウト🐪


    step関数のタイムアウトは、定義されている関数のタイムアウト値よりも多くの時間を待っている場合はキャンセルし、実行を終了することに注意してください.
    b799d9d4-5c32-5f42-89bb-830315b023f7    INFO    TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'
        at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:52:27)
        at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
        at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
        at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:686:14)
        at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
        at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
        at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
        at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
        at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:688:12)
        at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
      code: 'TaskTimedOut',
    

    変更の展開🌰


    あなたが変更を展開する間、あなたは、私たちが最後の記事でラムダに明示的に与えたものと同様に、SFNにメッセージを送るために必要な特権を与えるためにIAM変化を受け入れるプロンプトを得るかもしれません.


    サンプル実行ログ🌼



    この記事の目的を達成して、メッセージをキューとしてメッセージとしてプッシュするためにStep関数を作成しました.これは後でラムダによってポーリングされ、さらにstatemachineにステータスを返すように処理されました.したがって、このアーキテクチャは、ラムダ関数の直接呼び出しを持つ前のものよりずっとスケーラブルです.
    我々は、我々のスタックにより多くの接続を加えて、それを新しい建設物をつくることによって、今後の記事でより利用可能にすることになっています.
    ⏭ 次の記事はServerlessにあります
    🎉 サポートに感謝!🙏
    あなたが好きならば☕ Buy Me a Coffee , 私の努力を高めるために.

    🔁 オリジナルポスト🔗 Dev Post
    🔁 で転載🔗
    🔁 で転載🔗 medium com @aravindvcyber