【AWS】Lambda+SQSでサイロ + プールモデルを実現するためのディスパッチャ構成検討


きっかけ

  • 弊社が過去に開発し、現在も引き続き運用、追加開発をしているクライアントのSaaSがあり、 "特定のユーザが大量にリクエストしたときに他ユーザのリクエスト実行まで時間がかかるので、解消したい"とのリクエストをもらったから。

現状分析

  • リクエスト受信部分はREST APIで構成、他ユーザのリクエストに処理時間は依存しない
  • 実処理部分はリクエスト受信部分と非同期で処理を行っており、実処理が順次実行のため、この部分がボトルネック
  • 以下サイトで分類されている分離パターンでは現状は、1. サイロモデル

  • 特定処理、特定ユーザだけ分離できればいいので、3.サイロ + プールモデルを目指したい

検証で目指す構成

  • リクエストを受けるSQS→各リクエストを割り当てるDispatcher用Lambda→特定ユーザ用SQS,汎用ユーザ用SQSとそれぞれ実処理Lambdaをつなげる(本記事では実処理Lambdaは省略)

構築Step

  1. リクエスト受信SQSとDispatcher用Lambdaをつなぐ
  2. Dispatcher用LambdaからCompanyA用SQSへメッセージを格納する
  3. Dispatcher用Lambdaで受信メッセージに応じて格納する

1.リクエスト受信SQSとDispatcher用Lambdaをつなぐ

  • リクエスト受信SQSを作成(fifoキュー、重複削除のみ設定)

  • Dispatcher用Lambdaを作成(Node.js 14.xで構成)

  • Lambdaのソースコードを以下に修正、デプロイ

index.js
exports.handler = async function(event, context) {
  event.Records.forEach(record => {
    const { body } = record;
    console.log(body);
  });
  return {};
};
  • Lambda実行ロールに"AmazonSQSFullAccess"を付与

  • Lambda設定からリクエスト受信用SQSをトリガーとして設定

  • 動作確認 SQSでメッセージを送信→Lambdaでログを確認

2. Dispatcher用LambdaからCompanyA用SQSへメッセージを格納する

  • CompanyA用SQSを作成(fifoキュー、重複削除のみ設定)

  • Lambdaのソースコードを以下に修正、デプロイ

index.js
const aws = require('aws-sdk');
aws.config.region = 'ap-northeast-1';
const sqs = new aws.SQS({apiVersion: '2012-11-05'});

const QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/companyA.fifo';

exports.handler = async function(event, context) {

        const recs = event.Records;

        for (let rec of recs){
            const body = rec.body;
            console.log(body);

            const params = {
                MessageBody: body,
                QueueUrl: QUEUE_URL,
                MessageGroupId:rec.attributes.MessageGroupId,
                DelaySeconds: 0
            };

            console.log(params);

            const result = await sqs.sendMessage(params).promise();
            console.log(result)
        }
};
  • 動作確認 リクエスト受信用SQSへメッセージを送信→Lambdaのログを確認→CompanyA用SQSにメッセージが格納されていることを確認

3. Dispatcher用Lambdaで受信メッセージに応じて格納する

  • CompanyOTHER用SQSを作成

  • Lambdaのソースコードを以下に修正、デプロイ

index.js
const aws = require('aws-sdk');
aws.config.region = 'ap-northeast-1';
const sqs = new aws.SQS({apiVersion: '2012-11-05'});

const companyA_QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/companyA.fifo';
const companyOTHERS_QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/companyOTHER.fifo';
let queue_url = '';

exports.handler = async function(event, context) {

        const recs = event.Records;

        for (let rec of recs){
            const body = rec.body;
            console.log(body);

            const body_parse = JSON.parse(body);

            if(body_parse.company_code === '0001'){
                queue_url = companyA_QUEUE_URL;
            } else {
                queue_url = companyOTHERS_QUEUE_URL;
            }

            const params = {
                MessageBody: body,
                QueueUrl: queue_url,
                MessageGroupId:rec.attributes.MessageGroupId,
                DelaySeconds: 0
            };

            console.log(params);

            const result = await sqs.sendMessage(params).promise();
            console.log(result)
        }
};
  • 動作確認 リクエスト受信用SQSへメッセージを送信(company_codeで送信先制御)→Lambdaのログを確認→CompanyA用SQSにメッセージが格納されていることを確認

参考サイト