🏇 AWS CDK 101🍭 我々のSQSベースのラムダトリガを代入するスタタキシンとステップ関数


🔰 AWS CDKに初心者は、このシリーズで私の前の記事を1つずつ見てください.
場合は、私の前の記事を逃した場合は、以下のリンクでそれを見つけるか.
🔁 前のポスト🔗 Dev Post
🔁 前投稿を転載🔗
🔁 前投稿を転載🔗 medium com @aravindvcyber
この記事では、状態機械を呼び出す新しいルールに直接ラムダをトリガーするキューへのメッセージを対象とする前のイベントルールをリファクタにしましょう.

このアプローチで得られる利益🚣‍♀️


このアプローチに関連する複数の利点が次のとおりです.
  • AWSステップ関数を使用すると、すぐにアプリケーションを構築し、更新することができますので
  • 我々は、直接の呼び出しをSQSによって状態機械を通して間接的な呼び出しに分離することができます.
  • ステートマシンを使用して、我々はまた、我々のワークフローのステップの明確な塊として識別できるいくつかのステップ関数を追加することによって創造的に多くのオーケストレーションを行う能力を楽しむ一方、変換と条件のチェックをたくさん行うことができます.
  • したがって、最終的には、ラムダ内からスタミクティンの定義へのビジネスフローロジックのほとんどを除去し、それによってラムダ/プロセッサをより一般化し、同様に様々な他のタスクの間で共有することができます.
  • また、Statemachineは多くのメトリクス、ログ、および失敗の実際のポイントへの視覚参照を提供します.そして、それは我々がたどり着くのを難しいかもしれなくて、伝統的なモノリシックラムダの中に見つけるかもしれません.
  • 状態機械のための新しい構成🚧


    新しいファイルを作ることから始めましょうconstructs/sfn-simple.ts私たちは、共通のモジュールをstepfunction and stepfunctions_tasks 下記の通り.
    import * as lambda from "aws-cdk-lib/aws-lambda";
    import * as sfn from "aws-cdk-lib/aws-stepfunctions";
    import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
    import { Construct } from "constructs";
    
    我々はこの構成のために他のマイナーな輸入を必要とします、しかし、それらはすでに我々の他の記事で議論されます、そして、あなたは暗黙のうちにそれらを理解することができなければなりません.
    新しい小道具のインターフェイスを追加しましょうsfnProps この構成が実装されているスタックから必要な入力情報に.
    
    export interface sfnProps {
      triggerFunction: lambda.Function;
      timeout: Duration;
    }
    
    上記のコードブロックではtriggerFunction はバックエンドラムダであり、ここではstatemachine定義を使用した場合に後で書きます.timeoutはstatemachine呼び出しの最大合計時間を制限する.

    構築骨格🃏


    次のようにモデル構築テンプレートを作成します.`ts

    export class simpleSfnConstruct extends Construct {

    public readonly sfnMachine: sfn.StateMachine

    constructor(scope: Construct, id: string, props: sfnProps) {
    super(scope, id);
    }
    }

    `

    You could also find the read-only object sfnMachine , スタック関数からコンストラクタ関数定義の中で作成されたstatemachineを参照するために使用します.ts
    const { triggerFunction,timeout } = props;

    Usual destructuring of our props inside the constructor.

    ラムダペイロード🔑

    `ts

    const sfnTaskPayload = sfn.TaskInput.fromObject({
    "MyTaskToken": sfn.JsonPath.taskToken,
    "Record": {
    "messageId.$": "$.id",
    "createdAt.$": "$.time",
    "event.$": "States.StringToJson($.detail.message)"
    }
    });
    `

    Here sfnTaskPayload ラムダを起動し、その完了を待つために使用される我々のSteFunctionの中のパラメタとして通過する我々のペイロードを定義します.
    ここで注意する重要なことは以下のプロパティになります.

  • MyTaskToken は、sfn.JsonPath.taskToken 文脈データから、そして、ステップ関数は休止して、待ちます.後でラムダから結果を得るSendTaskSuccess and SendTaskFailure これは、ラムダをポーリングせずに出力を生成するのを助けます.何度も何度も、ステータスをチェックして、これを使用して状態遷移を保存しますMyTaskToken 参考に.
  • レコードセクションでは、プロセッサコードベースのイベントメッセージデータからステップ関数そのものまでのメッセージレベルデータ抽出のある部分を前の記事に比べてオフロードしたことがわかりました.これはJSON path syntax . 実際に計算を実行する前に、この種の変換とデータ抽出は、細かい粒度の粒度で私たちを助け、私たちのワークフローのロジックを制御し、statemachineデザイナーやメンテナにとって有用です.
  • ts
    const recordMsg = new tasks.LambdaInvoke(this, "Record Message", {
    lambdaFunction: triggerFunction,
    timeout: Duration.minutes(1),
    comment: "Record message in dynamo",
    integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
    inputPath: "$",
    payload: sfnTaskPayload,
    resultSelector: {
    "Payload.$": "$",
    "StatusCode.$": "$.statusCode"
    },
    resultPath: "$.recordResult",
    });

    最終的なステイタスステップ📲

    This is only a formal success and failure point of reference, which we create in our statemachine to better visualize what has happened during the workflow execution.

    `ts
    const jobFailed = new sfn.Fail(this, "Job Failed", {
    comment: "Job Failed"
    });
    const jobSucceed = new sfn.Succeed(this, "Job Succeed", {
    comment: "Job Succeed"
    });

    `

    ワークフローを分岐するための選択ステップ✈️

    The choice step is used as a visual and functional reference to help make a decision based on the output from the lambda invocation job status from the previous step. Here we do the decision-making by choosing the next step using the data from the previous step's output.

    ts
    const checkStatus = new sfn.Choice(this, "Check Status?",{
    inputPath: "$.recordResult"
    })
    .when(sfn.Condition.numberEquals("$.StatusCode", 500), jobFailed)
    .when(sfn.Condition.numberEquals("$.StatusCode", 200), jobSucceed)
    .otherwise(jobFailed);

    statemachine定義へのステップ関数連鎖🔗

    You can see from the below code that statefunction is nothing but a chain of stepfunctions, which we defined earlier. And every stepfunction is connected to the next one as a simple chain, though it could even contain some branching steps like the choice function.

    ts
    const sfnDef = recordMsg.next(checkStatus);

    StateMachineロググループ🌼

    A new log group is created to contain the logs received from the statemachine execution as shown below. This will be used in the statemachine implementation part.

    `ts

    const sfnLog = new LogGroup(this, "sfnLog", {
    logGroupName: "sfnLogGroup",
    removalPolicy: RemovalPolicy.DESTROY,
    retention: RetentionDays.ONE_WEEK
    })

    `

    ステマチンチン仕様🐩

    With that now, we can define the statemachine properties as shown below. Here it includes the sfnDef and sfnLog を作成します.`ts
    const stateMachine = new sfn.StateMachine(this, "msgStateMachine", {
    definition: sfnDef,
    timeout: timeout,
    logs: {
    destination: sfnLog,
    includeExecutionData: true,
    level: LogLevel.ALL
    }
    });

    `

    ラムダをstatemachineに呼び出す🌹

    Since lambda is a resource, we have to explicitly grant privilege to the statemachine execution role utilizing adding a new IAM policy statement sfnLambdaInvokePolicy 下記.これはRecord Message 上記のワークフローのステップ.ts
    const sfnLambdaInvokePolicy = new Policy(this, 'sfnLambdaInvokePolicy');
    sfnLambdaInvokePolicy.addStatements(
    new PolicyStatement({
    actions:[
    "lambda:InvokeFunction"
    ],
    effect: Effect.ALLOW,
    resources: [
    $この関数は以下のようになります.],
    sid: "sfnLambdaInvokePolicy"
    })
    )
    stateMachine.role.attachInlinePolicy(sfnLambdaInvokePolicy)

    ステータスを送信するためのラムダ実行機能の付与🍁

    Since we are not going to poll the lambda to find the status, again and again, we expect the lambda to callback ジョブ完成の結果に関するstatemachine私たちは既にtoken ラムダへのペイロードの一部であり、それから、ステータスを与えているstatemachineにメッセージを戻すsuccess or failure シナリオに基づきます.それまで、Statemachineはその現在のステップで休止されます.
    これを達成するために、プロセッサラムダ実行機能に割り当てられているIAMポリシーステートメントの形式で特権を見つけてください.ts
    const lambdaSfnStatusUpdatePolicy = new Policy(this, 'lambdaSfnStatusUpdatePolicy');
    lambdaSfnStatusUpdatePolicy.addStatements(
    new PolicyStatement({
    actions:[
    "states:SendTaskSuccess",
    "states:SendTaskFailure",
    ],
    effect: Effect.ALLOW,
    resources: ['*'],
    sid: "lambdaSfnStatusUpdatePolicy"
    })
    )
    triggerFunction.role?.attachInlinePolicy(lambdaSfnStatusUpdatePolicy)

    sfnmachine readonlyプロパティの設定🐲

    This is required to access the statement object from the stack where it is implemented and use it for further integration.

    `ts
    stateMachine.applyRemovalPolicy(RemovalPolicy.DESTROY);

    this.sfnMachine = stateMachine
    `

    ダイナモにメッセージを記録するラムダ🌷

    Let us create a new file under lambda/message-recorder.ts . このラムダでは、指定されたテーブルに保存するDynamoDBを実行するだけです.正常な出力メッセージで、成功または失敗シナリオのために状態callbackを送る論理の他に.`ts
    import { PutItemInput } from "aws-sdk/clients/dynamodb";

    import { DynamoDB,StepFunctions } from "aws-sdk";

    const sfn = new StepFunctions({ apiVersion: "2016-11-23" });

    exports.processor = async function (event: any) {
    const dynamo = new DynamoDB();
    let result: any | undefined = undefined;
    const msg = event.Record;
    const crt_time: number = new Date(msg.createdAt).getTime();
    const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
    messageId: { S: msg.messageId },
    createdAt: { N: ${crt_time} },
    イベント: { S : JSON . stringify ( msg . events )}
    //
    返り値
    //
    試してみる
    結果=ダイナモを待ちます.putitem ( putdata )promise ()も参照ください.
    キャッチする
    const sendfailure:ステップ機能.sendtaskFailureInput = { { 1 } { 0 }を出力する
    エラー: JSON .stringify ( errr ),
    原因:JSON.stringify ()を使用する
    Statuscode : 500
    ヘッダ:{ content content ":"text/json "
    PutStatus::
    MessageID : MSGMessageID
    ProcessResult :
    //
    ))、
    TaskToken :イベント.MyTaskToken
    //

    await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
      if (err) console.log(err, err.stack); 
      else console.log(data); 
    });
    return sendFailure;
    

    const send成功:関数.sendtaskExposure = {\\fPを使用する.
    出力: JSONstringify ()を使用する
    Statuscode : 200
    ヘッダ:{ content content ":"text/json "
    PutStatus::
    MessageID : MSGMessageID
    結果:結果
    //
    ))、
    TaskToken :イベント.MyTaskToken
    //
    待つ
    . sendTaskSuccess ( send成功、関数( ERR : Any , data : any ){ }
    ( ERR )コンソールならば.ログ( ERR , ERR ,スタック);
    他のコンソール.ログデータ
    ))>
    パラメータ
    結果を返す
    //
    `

    スタック内のラムダの定義🍃

    Here we will be using the above code asset to define the lambda resource inside our CDK stack.

    `ts

    const messageRecorder = new lambda.Function(this, "MessageRecorderHandler", {
    runtime: lambda.Runtime.NODEJS_14_X,
    code: lambda.Code.fromAsset("lambda"),
    handler: "message-recorder.processor",
    logRetention: logs.RetentionDays.ONE_MONTH,
    environment: {
    MESSAGES_TABLE_NAME: envParams.messages.tableName || "",
    },
    });

    messageRecorder.applyRemovalPolicy(RemovalPolicy.DESTROY);
    `

    スタック内の新しいSFN構築の実装🌴

    Importing the construct library created earlier.

    ts
    import {simpleSfnConstruct} from "../constructs/sfn-simple"

    Passing the required params and getting an instance object reference by initialing the construct.

    ts
    const sfnMachine = new simpleSfnConstruct(this, 'sfnMachine', {
    timeout: Duration.seconds(30),
    triggerFunction: messageRecorder
    })

    イベントターゲット📢

    ts
    const sfnRole = new Role(this, 'Role', {
    assumedBy: new ServicePrincipal('events.amazonaws.com'),
    });
    const sfnCommonEventTarget = new eventTargets.SfnStateMachine(sfnMachine.sfnMachine,{
    deadLetterQueue: commonEventProcessorQueueDLQ.queue,
    retryAttempts: 3,
    input: RuleTargetInput.fromEventPath("$"),
    role: sfnRole
    })

    イベントターゲットの新しいイベント規則🔩

    In this event rule, we use the new bus commonbus そして、我々は同じを使用eventPattern イベントを上記のstatemachineに転送する.ts
    const sfnEventRule = new Rule(this,
    SFNCommoneProvitionRule, {
    eventBus: commonBus,
    eventPattern: { source: [
    COM .デビット.コモンイベント] },
    targets: [sfnCommonEventTarget],
    ruleName: "sfnCommonEventProcessorRule",
    enabled: true
    });
    sfnEventRule.applyRemovalPolicy(RemovalPolicy.DESTROY);

    郵便配達人のテスト🎿

    Here I will be performing a test, by sending a message to the API endpoint as shown below once I have deployed the solution to my AWS environment.

    Querying with the messageId DynamoDB内で、テーブルは次のとおりですsql
    SELECT * FROM "MessagesTable" where messageId = 'cdd51245-987a-b3c7-eecf-6d6d63046073'

    We can find the message in the dynamodb now.

    AWSコンソールからの検査🎳

    You could now check the workflow progress and execution logs from the AWS console.

    イベントメッセージ🏀

    Thus we have defined a new statemachine and reconfigured our existing event bus role to the new rule which delivers messages to the statemachine that we have built-in this article.

    We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

    ⏭ We have our next article in serverless, do check out

    🎉 Thanks for supporting! 🙏

    Would be great if you like to ☕ Buy Me a Coffee, to help boost my efforts.

    🔁 Original post at 🔗 Dev Post

    🔁 Reposted at 🔗

    🔁 Reposted at 🔗 medium com @aravindvcyber