S3,SNS,SQS,Lambdaを利用したファンアウト構成


Cloudformaionを使用したSNS,SQSの構築は、
検索しても少なかったので書きました。
(需要自体なさそう)

Amazon SQS(Simple Queue Service)とは

メッセージキューイングサービスです。

送信側はキュー(データを先入れ先出しの構造で保持するもの)にメッセージを送り、
受信側はキューにポーリングしてメッセージを受け取ります。

標準とFIFO

SQSではタイプとして、標準とFIFO配信を選択できます。

標準タイプでは、メッセージの順番を必ず保証はできません(ベストエフォート)。

FIFOタイプではメッセージの順番を守ります。
SNS、S3などのの連携をサポートしていません。

Amazon SNS(Simple Notification Service)とは

登録先にメッセージをプッシュ配信するサービスです。
特定のメールアドレスやSQSなどにメッセージを送ることができます。

CloudWatchアラームでEC2のCPU使用率が閾値を超えた場合や、
S3にオブジェクトが置かれた場合の通知先で使用されるイメージ。

注意点

複数メッセージ

SQSはAZ単位で冗長化してメッセージを保存しているため、AZ障害が発生した場合などに、複数メッセージをクライアントが受け取ってしまうことがあるみたいです。
FIFO配信でも同じことが起こります。

つまり、SQSを使う場合は同じメッセージをクライアントが受信して、二回実行しても問題ないようにしなければなりません。

SNSを挟む理由

LambdaのトリガーとしてならばSQS Lambdaでも実装可能ですが、複数のサブスクライバー(送信先)にメッセージを受信させたい場合は、SNSが必要です。
複数のサブスクライバーにメッセージを送信する可能性がある場合は使用、そうでなければ使用しないで良いと思います。

メリット

・複数の受信者へ同時にメッセージを送信することができる。
・送信コンポーネントと受信コンポーネントを分離することができる。
・SNSはサブスクライバーが常時メッセージを受け取れるようにしなければならないが、SQSでは受け取り準備ができてなくてもいい。
・複数のサブスクライバーに送りつつ、Eメールも送りたいとか追加の要望が来た場合でも対応可能。

デメリット

・SNSの追加料金が発生する。
・SNSの通信時の処理が送信時に最大50msほどかかる。

暗号化

暗号化する場合はデフォルトのキーでは不可。
新たにAWS KMSでキーを作成する必要があり、KMSのポリシーでSNSを許可してあげないと通信できない。

今回作成したCloudformaionでは、以下のような処理になっている。
- S3 からイベント内容を SNSトピックへ通知
- SNS で通知データを CMK を使用して暗号化
- SNS で通知データを CMK を使用して平文に戻し、SQS キューに送付
- SQS でメッセージを CMK を使用して暗号化
- Lambda から SQS のメッセージを取得(取得時、SQS でメッセージを CMK を使用して平文に戻す)

SQSの可視性タイムアウト、Lambdaのバッチサイズと同時実行数

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
上記ドキュメントにも記載ある通り、可視性タイムアウトの秒数をLambdaの最大実行時間よりも長くすることが普通とのこと。

つまり、Lambdaの起動時間を如何に絞れるかによって、再度ポーリングするまでの時間も早くできる。

実際、Cloudformationでは下記のように「可視性タイムアウト」 ≧ Lambda タイムアウトにしないとデプロイでない。(しかし、手動でコンソール上から変更することは可能そう。最悪手動変更することで、例えばLambda最大起動時間:15分、ポーリング10秒とかも可能?)
sqsTrigger CREATE_FAILED Invalid request provided: Queue visibility timeout: 30 seconds is less than Function timeout: 900 seconds (Service: Lambda, Status Code: 400, Request ID: 5b96e5ec-c1e0-4aea-afdc-b8fa0ce77555, Extended Request ID: null)

リソース

Cloudformaionで作成。東京リージョンで使用。

AWSTemplateFormatVersion: "2010-09-09"
Transform: "AWS::Serverless-2016-10-31"
Resources:
AWSTemplateFormatVersion: "2010-09-09"
Resources:
  createCmk:
    Type: AWS::KMS::Key
    Properties:
      Description: "Encrypt sensitive data for Billboard site."
      KeyPolicy:
        Version: "2012-10-17"
        Id: key-consolepolicy-3
        Statement:
          - Sid: "Allow administration of the key"
            Effect: Allow
            Principal:
              AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
            Action:
              - "kms:*"
            Resource: "*"
          - Sid: "Allow Amazon SNS of the key"
            Effect: Allow
            Principal:
              Service: "sns.amazonaws.com"
            Action:
              - "kms:Decrypt"
              - "kms:GenerateDataKey*"
            Resource: "*"
          - Sid: "Allow Amazon S3 of the key"
            Effect: Allow
            Principal:
              Service: "s3.amazonaws.com"
            Action:
              - "kms:Decrypt"
              - "kms:GenerateDataKey*"
            Resource: "*"
  createKeyAlias:
    DependsOn: createCmk
    Type: 'AWS::KMS::Alias'
    Properties:
      AliasName: alias/testcmk
      TargetKeyId: !Ref createCmk


  sqsQueue:
    DependsOn: createKeyAlias
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "snssqstestqueue"
      KmsMasterKeyId: alias/testcmk
      VisibilityTimeout: 900 
  snsTopic:
    DependsOn: createKeyAlias
    Type: AWS::SNS::Topic
    Properties:
      TopicName: "snssqstesttopic"
      KmsMasterKeyId: alias/testcmk
  snsTopicPolicy:
    DependsOn: snsTopic
    Type: "AWS::SNS::TopicPolicy"
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: "SID"
            Effect: Allow
            Principal:
              Service: "s3.amazonaws.com"
            Action: "SNS:Publish"
            Resource: !Ref snsTopic
            Condition:
              ArnLike:
                aws:SourceArn: !Join [ "", [ "arn:aws:s3:::", "snssqstestbuckets3" ]]
      Topics:
        - !Ref snsTopic
  s3Bucket:
    DependsOn: snsTopicPolicy
    Type: "AWS::S3::Bucket"
    Properties:
      BucketName: "snssqstestbuckets3"
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True
      NotificationConfiguration:
        TopicConfigurations:
          - Event: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: txt
            Topic: !Ref snsTopic
  snsSubscription:
    DependsOn: snsTopic
    DependsOn: sqsQueue
    Type: AWS::SNS::Subscription
    Properties:
      Protocol: sqs
      Endpoint: !GetAtt sqsQueue.Arn
      TopicArn: !Ref snsTopic
  sqsQueuePolycy:
    DependsOn: snsTopic
    DependsOn: sqsQueue
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: "SID"
            Effect: Allow
            Principal: "*"
            Action: "sqs:*"
            Resource: "*"
            Condition:
              ArnEquals:
                aws:SourceArn: !Ref snsTopic
      Queues:
        - !Ref sqsQueue
  lambdaRole:
    DependsOn: sqsQueue
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - "arn:aws:iam::aws:policy/AmazonS3FullAccess"
        - "arn:aws:iam::aws:policy/ReadOnlyAccess"
        - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
        - "arn:aws:iam::aws:policy/AmazonSQSFullAccess"
        - arn:aws:iam::aws:policy/AWSKeyManagementServicePowerUser
      MaxSessionDuration: 3600
      Path: "/"
      RoleName: "sqssnstestrole"

  lambdaFunction:
    DependsOn: lambdaRole
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: "sqssnstestlambda"
      Handler: index.lambda_handler
      MemorySize: 1024
      Runtime: "python3.6"
      Code:
        ZipFile: >
          import json

          def lambda_handler(event, context):
            print(event)
      Role: !GetAtt lambdaRole.Arn
      Timeout: 900
      ReservedConcurrentExecutions: 1

  sqsTrigger:
    Type: AWS::Lambda::EventSourceMapping
    DependsOn: lambdaFunction
    Properties:
      BatchSize: "1"
      Enabled: true
      EventSourceArn: !GetAtt sqsQueue.Arn
      FunctionName:
        Fn::GetAtt: 
          - "lambdaFunction"
          - "Arn"