システムの定期的ではない使い捨ての予約処理をCloudWatchEventで実装した話


概要

様々なシステムの機能の中で特定処理を予約する場合は頻繁にあります。その予約処理が定期的な時間で処理する場合はcronをよく利用してました。

しかし、定期的ではない使い捨ての予約処理の場合、皆さんはどのように実装してますか?例えば、ウェブサイトの管理者ページでお知らせを作成して予約時間を設定すると、その時間に公開関連処理(データ変更、メール送信など)のような場合になるんですかね。システムの環境と技術によって方式は色々あると思いますが、恐らくcronは適切じゃないと思います。

問題の話

昔に構築したシステムで自分はそのような使い捨ての予約処理もcronを利用してました。予約時間があるデータを確認して、データがあったらそれに対する処理をするシェルスクリプトを作って、なるべく短い間隔の時間でそのシェルスクリプトが処理するようにcronを設定しておきました。

しばらくはこの構造が特に問題なく想定した通りに処理してくれました。システムで処理してるデータの量と対象が多くなるとシェルスクリプトが重くなると起きる懸念でFileLock処理を入れるなどの対応はたまたまありましたが、いくらでも乗り越えるものでした。

しかし、時間が過ぎながらいくつか問題が発生されて結局、非効率的だと判断して予約処理の構成をリニューアルすることになりました。システムの都合上、短い時間の間隔でチェックするシェルが増え、それによってcronから生まれるプロセスが増え、結局メモリに負担がかかったのが何よりの理由でした。メモリに負担を下げる方法は色々ありました。単純にcronに予約したシェルの数を大幅に減らしたり、cron予約とそれに対するシェルの処理をAWS CloudWatch EventsとAWS Lambdaに任せてServerlessにするなど、方法は色々ありました。
しかし、そもそも、定期的ではないタスクをするために定期的にチェックすることは効率的ではないと思いました。

対応した話

結局、以下のようにAWS CloudWatch Eventsを活用して簡単に使い捨ての予約処理を実装しました。

フローを簡約に申し上げますと、
システムから予約処理が発生された時、CloudWatchEvents SDKで必要なデータと共に年、月、日、時、分を指定してイベントを生成します。その後、イベントのターゲットをLambdaで指定してそこで完結しても良い、必ずサーバの中で処理する必要がありましたら、イベントに入れたデータをそのままSNS Topicメッセージにしてサーバのエンドポイントに投げればOKです。

ちなみに、Cronとは違くてCloudWatch Eventsは年まで指定することができ、必ず一回だけ実行されます。
また、すでにあるイベントと同じ名前でイベントを追加すると上書きされるので、データのユニーク値などを名前に構成すると予約する日付の変更が発生された場合、以前のイベントを消したりなど冪等性を意識しなくても大丈夫はず!

PHP(CakePHP)で実装したコードは以下の通りです。
(必要な部分以外は省略しております!)

まず、ReserveTaskDispatcherのクラスを作ってタスクをディスパッチできるようにします。


<?php

namespace App\Statics;

use Aws\CloudWatchEvents\CloudWatchEventsClient;
use Aws\Lambda\LambdaClient;
use Aws\Exception\AwsException;
use Cake\I18n\FrozenTime;
use Cake\Log\Log;

class ReserveTaskDispatcher
{
    /**
     * putReservedTaskEvent
     *
     * @param \Cake\ORM\Entity $entity
     * @return bool
     */
    public static function dispatch($entity)
    {
        $modelAlias = $entity->get('registryAlias');
        $id = $entity->id;
        $datetime = $entity->reserveDatetime;

        $cloudWatchEventsClient = CloudWatchEventsClient::factory(self::getCloudWatchEventsConfig());
        $lambdaClient = LambdaClient::factory(self::getLambdaConfig());
        $targetArn = self::TARGET_RESERVED_NOTIFY_LAMBDA_ARN;

        // イベント生成の1. CloudWatch Eventsのルール生成
        $ruleParams = self::getRuleParams($modelAlias, $id, $datetime);
        try {
            $putRuleResult = $cloudWatchEventsClient->putRule($ruleParams);
        } catch (AwsException $e) {
            Log::error($e->getMessage());
            return false;
        }

        // イベント生成の2. 上で生成したイベントのルールからイベントのターゲットになるLambda関数にアクセスできるように権限追加する
        $ruleArn = $putRuleResult->get('RuleArn');
        $ruleName = $ruleParams['Name'];
        try {
            $lambdaClient->addPermission([
                'Action' => 'lambda:InvokeFunction',
                'FunctionName' => $targetArn,
                'Principal' => 'events.amazonaws.com',
                'SourceArn' => $ruleArn,
                'StatementId' => $ruleName,
            ]);
        } catch (AwsException $e) {
            Log::error($e->getMessage());
            return false;
        }

        // イベント生成の3. CloudWatch Eventsのターゲットを生成する
        $targetParams = self::getTargetParams($ruleName, $modelAlias, $id, $datetime, $targetArn);
        try {
            $cloudWatchEventsClient->putTargets($targetParams);
        } catch (AwsException $e) {
            Log::error($e->getMessage());
            return false;
        }

        return true;
    }

    /**
     * getRuleParams
     *
     * @param string $modelAlias
     * @param string|int $id
     * @param string $datetime
     * @return array
     */
    private static function getRuleParams($modelAlias, $id, $datetime)
    {
        $datetimeObj = new FrozenTime($datetime);
        $name = join([
            env("CAKE_ENV_MODE"),
            $modelAlias,
            $id,
            'ReservedTask',
        ], '-');

        return [
            'Name' => $name,
            'Description' => 'hoge hoge hoge',
            'ScheduleExpression' => "cron({$datetimeObj->setTimezone('UTC')->i18nFormat('m H d M ? yyyy')})", //UTC
            'State' => 'ENABLED',
        ];
    }

    /**
     * getTargetParams
     *
     * @param string $ruleName
     * @param string $modelAlias
     * @param string|int $id
     * @param string $datetime
     * @param string $targetArn
     * @return array
     */
    private static function getTargetParams($ruleName, $modelAlias, $id, $datetime, $targetArn)
    {
        $inputData = [
            'model_alias' => $modelAlias,
            'data' => [
                'id' => $id,
                'datetime' => $datetime
            ]
        ];
        return [
            'Rule' => $ruleName,
            'Targets' => [
                [
                    'Id' => $ruleName,
                    'Arn' => $targetArn,
                    'Input' => json_encode($inputData)
                ]
            ]
        ];
    }
}

その後、ビジネスロジックの中でタスクを予約するところにディスパッチしておくと終わり!

        if ($entity->isReserved) {
            ReserveTaskDispatcher::dispatch($entity);
        }

システムからバンバンイベントがされます。

以上です。