JavaScript開発者のためのさび:AWSラムダによるSQSバッチエラー処理

6259 ワード

私は、このポストで部分的なバッチ反応を使っているAWS SQS比較を強化します.
The partial batch response は、2021年11月に発表されました、そして、それは部分的な失敗を取り扱うために我々、開発者を助けています.
ミニシリーズの一部としてRust for JavaScript developers 他の部分をチェックアウトできます.
  • Part 1 私はAWS SQSの基本的な比較を示しました.
  • Part 2 AWS appconfigの基本的な比較を示しました.
  • Part 3 CSVファイルを処理するための基本的な比較を示しました.
  • なぜ


    ここ数年、私は言語を何度も切り替えました.NET、JavaScript、Rust、および1つの言語で取得した知識は、新しいものに転送可能です.したがって、我々は精神的にマップを迅速にそれを選択する必要があります.
    私はこの動画にはまだ動画レスポンスがありません.

    基本


    錆は多くの組み込み機能を備えています.例えば:
    js

    国立天文台
    貨物
    国立天文台
    カーゴパンツ
    NPMインストール
    カーゴインストール
    NPMのビルド
    カーゴビルド
    パッケージ.JSON
    カーゴ.トムール
    パッケージロック.JSON
    カーゴ.ロック
    Webpack
    カーゴビルド
    リント
    カーゴクラッピー
    前途
    カーゴFMT
    doc世代
    貨物ドック
    Jestのようなテストライブラリ
    貨物試験
    新しいSAMベースのServerlessアプリを生成する
    sam init --location gh:aws-samples/cookiecutter-aws-sam-rust
    

    アマゾンSQS


    前の挑戦the partial batch response が発表され、エラーを扱っていた.処理の間、SQS記録のうちの1つが失敗すると仮定して、メッセージの全バッチは失敗したと考えられます.我々の多くは成功と失敗メッセージを追跡して、この制限を回避するためにプログラム的に処理されたものを削除しました.
    コードは非常に似ているのでone , AWS SAMテンプレートから始めて、必要な変更を報告します
    AWSTemplateFormatVersion: 2010-09-09
    Transform: 'AWS::Serverless-2016-10-31'
    Description: Writer Lambda.
    
    ##########################################################################
    #  Global values that are applied to all resources                       #
    ##########################################################################
    Globals:
      Function:
        MemorySize: 1024
        Architectures: ["arm64"]
        Handler: bootstrap
        Runtime: provided.al2
        Timeout: 29
        Layers:
          - !Sub arn:aws:lambda:${AWS::Region}:580247275435:layer:LambdaInsightsExtension-Arm64:1
        Environment:
          Variables:
            RUST_BACKTRACE: 1
            RUST_LOG: info
    
    Resources:
    ##########################################################################
    #   Lambda Function                                                      #
    ##########################################################################
      LambdaFunction:
        Type: AWS::Serverless::Function
        Properties:
          CodeUri: ../build/handler
          Policies:
            - AWSLambdaBasicExecutionRole
          Events:
            MySQSEvent:
              Type: SQS
              Properties:
                Queue: <ARN of the SQS queue>
                BatchSize: 10
                FunctionResponseTypes:
                  - ReportBatchItemFailures
    
    コードは次のようになります.
    pub async fn execute(client: &aws_sdk_dynamodb::Client, event: LambdaEvent<SqsEvent>,) -> Result<Value, Error> {
        let failed_message: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
        let mut tasks = Vec::with_capacity(event.payload.records.len());
        let shared_client = Arc::from(client.clone());
    
        for record in event.payload.records.into_iter() {
            let shared_client = shared_client.clone();
            let message_id = record.message_id.unwrap();
            let failed_message = failed_message.clone();
    
            tasks.push(tokio::spawn(async move {
                if let Some(body) = &record.body {
                    let request = serde_json::from_str::<MyStruct>(&body);
                    if let Ok(request) = request {
                        DoSomething::new()
                          .execute(&shared_client, &request)
                          .map_or_else(|e| {
                            failed_message.lock().unwrap().push(message_id.clone());
                          }, |_| ());
                    } 
                }
            }));
        }
    
        join_all(tasks).await;
    
        let response = BatchItemFailures {
            batch_item_failures: failed_message.lock().unwrap().clone()
                .into_iter()
                .map(|message_id| {
                    return ItemIdentifier {
                        item_identifier: message_id,
                    };
                })
                .collect(),
        };
    
        Ok(serde_json::to_value(response).unwrap())
    }
    
    これは、タイプスクリプトのバージョンとはあまり異なりません.
    export const handler = async (event: SQSEvent): Promise<BatchItemFailures> => {
      const failedMessageIds: string[] = [];
      await Promise.all(
        event.Records.map(async (record: SQSRecord) => {
          try {
            //DoSomething(record)
          } catch (error) {
            failedMessageIds.push(record.messageId);
          }
        }),
      );
    
      const batchItemFailures: BatchItemFailures = {
        batchItemFailures: failedMessageIds.map(id => {
          return {
            itemIdentifier: id
          }
        })
      };
    
      return batchItemFailures;
    };
    
    錆は厳格で、ループ内でtokio ::spawnを使用しているので、失敗したメッセージのベクトルは動きません.
     Arc<Mutex<Vec<String>>>
    
    私は、SQUIDの識別子をjoin_all どこで、私は失敗のために濾過することができましたか.

    結論


    Serverlessアプリケーションの錆構文はおそらく他の言語よりも不器用です.それでも、私は毎日錆を使用していません.余分なmain ()、unwrap ()やmutex/arcには気がつかない.それはちょうど言語の一部です、そして、私は現在慣れました.
    私の目標は、別のプログラミング言語を使用して、あなたのスキルセットを再利用し、より良い結果を達成するために必要なことを証明することです(要件は、仕事のための最良のツールを選択).他のプログラミング言語で書かれたのと同じコードを持っていると、すぐに錆になるのを助けてくれます.