キネシスデータファイヤーホースとラムダのためのCloudFormation例


キネシスデータファイア(私は配信ストリームとしても参照)とラムダを使用すると、ストリーミングデータを処理するための素晴らしい方法であり、両方のサービスが無意味であるので、彼らが使用されていない間、管理するか、または支払うためのサーバーがありません.私は複数のサービスからストリーミングされているとして、この組み合わせを数回マスクをマスクやスクラブを使用している.キネシスの大きな部分の一つは、他のAWSサービスがCloudWatchのように直接統合されているということです.直接の統合がない場合、データはPUTリクエストを使用して直接プッシュできます.
キネシスは、ラムダとの統合を介して配信ストリームに入った後、データを簡単に変換することができます.レコードは、ラムダを変換することができますし、レコードは最終的な宛先に達すると、来る.その最終的な目的地は、S 3、弾性検索、またはSplunkのようなものでありえました.他にも組み込みの統合があります.私はこれのために私のCloudformationテンプレートを構築していましたが、私はバケツを作成するのが簡単であるので、S 3に決めました、そして、S 3バケツで座っているデータでする他の大きなもののトンがあります.
私のベースレベルのテンプレートはAWS CloudFormation Reference repository 私はクイックリファレンスポイントとビルディングブロックとして作成した非常にいくつかの他のテンプレートと一緒に.
ラムダ関数から始まって、インフラストラクチャ側からこれを構築することに関する少しの難しい部分もありませんでした.IAMの役割については、私は単にarn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole . これは私が数分以内に起きて実行するために必要なすべてを持っていた.ラムダのアクセス許可は、まだ私に混乱している.校長events.amazonaws.com が必要です.lambda:InvokeFunction Kinesis以来最初に私に意味をなさなかった行動は、ラムダを引き起こすものです.少なくとも、それは私が舞台裏で起こったと思ったことです.
次は、消防士自身とそのIAMの役割.Kinesis has multiple destination configuration から選択するプロパティと各配信ストリームのみを取得します.オプションは、弾力検索、S 3、赤方偏移、スプライン、キネシスストリーム(キネシスデータファイヤーホースとは異なります)、または一般的なHTTPエンドポイントです.S 3が利用可能な2つの目的地設定プロパティを持っている点に注意してください.S3DestinationConfiguration and ExtendedS3DestinationConfiguration . 私が言うことができるものから、拡張目的地はラムダ処理のような追加構成を考慮に入れます、一方、通常の目的地構成はそれを簡単に進めるために単純です.私は使用することを決めたExtendedS3DestinationConfiguration なぜなら、ラムダの積分を使いたかったからです.ラムダの統合はProcessingConfiguration 資産ExtendedS3DestinationConfiguration , これは、次のスニペットのようなものを探して終了します.
...
ExtendedS3DestinationConfiguration:
  ...
  ProcessingConfiguration:
    Enabled: 'true'
    Processors:
      - Parameters:
          - ParameterName: LambdaArn
            ParameterValue: !GetAtt LambdaFunction.Arn
        Type: Lambda

私がこのテンプレートに取り組んでいた最も純粋な部分は、最も長い間、データファイヤーのためのIAM役割でした.私が私の問題を考え出したあと、私はAWSのドキュメントでページを見つけましたdifferent permissions required for various integrations , それは、私があらかじめそれについて知っていた助けたでしょう.私が立ち往生したのはファイアホースのラムダ許可です.当初は以下のようなものしかなかった.
- Effect: Allow
  Action:
    - 'lambda:InvokeFunction'
    - 'lambda:GetFunctionConfiguration'
  Resource:
    - !Sub "${LambdaFunction.Arn}"

これは、失敗した経路の下で私のS 3バケツに書き込むファイヤーホースに終わります.私がきれいな記録の代わりに得たものは、以下のメッセージの線に沿った何かでした.
{
  "attemptsMade": 4,
  "arrivalTimestamp": 1622242573374,
  "errorCode": "Lambda.InvokeAccessDenied",
  "errorMessage": "Access was denied. Ensure that the access policy allows access to the Lambda function.",
  "attemptEndingTimestamp": 1622242649990,
  "rawData": "eyJ0aWNrZXJfc3ltYm9sIjoiQU1aTiIsInNlY3RvciI6IlRFQ0hOT0xPR1kiLCJjaGFuZ2UiOi02LjU1LCJwcmljZSI6NzMzLjQxfQ==",
  "lambdaArn": "REDACTED"
}

あまりにも長い間これをじっと見つめて、私が何を間違っていたかについて疑問に思った後に、私は最終的にResource IAMロールのポリシードキュメント.最終的に私にとってのトリックは、前の声明の次の調整でした.
- Effect: Allow
  Action:
    - 'lambda:InvokeFunction'
    - 'lambda:GetFunctionConfiguration'
  Resource:
    - !Sub "${LambdaFunction.Arn}*" # NOTE: there is an * after the Lambda's ARN

あることを確認する* ラムダのarnの後.その後、私のレコードはすべてデータパイプラインを流れ始めました!
現在、この統合のオープンエンド部分は、ラムダ関数が実行されるコードです.以前の経験について書きました.そのコードは間違いなく私がこの時間を書いたもののより複雑なバージョンだった.このテンプレートのために、私はコードを単純にしたかったです.私はFireホースからBase 64記録をデコードして、内容を印刷して、記録されていないファイヤーホースに戻ってください.目標は、単にすべてが意図通りに働いていることを確認することでした.これはノード/JavaScriptで書いたコードです.
exports.handler = (event, context, callback) => {
  /* Process the list of records and transform them */
  const output = event.records.map((record) => {
    const plaintextData = Buffer.from(record.data, 'base64').toString('ascii');
    console.log(plaintextData);
    return {
      recordId: record.recordId,
      result: 'Ok',
      data: record.data,
    };
  });
  console.log(`Processing completed. Successful records ${output.length}.`);
  callback(null, { records: output });
};

そのコードはあまり役に立ちませんが、良い出発点です.ここからデータを変えることができます.値を追加することができます、値を赤くすることができますアラームは、コンテンツに基づいてトリガすることができます.ラムダを伴うと、我々は野生の西に何が行くので、それを楽しんでいる!