AWS SimpleWorkflowでLambdaを使って、Activityの運用から解放されたい!


はじめに

  • AWS SWFご存知でしょうか。大体の人が名前は知ってるけど触ったことがないという反応が返ってきます。(SWF使いどころたくさんあるんだけど、やっぱり流行らないのかなー(涙))
  • SWFとFlowFramework(Java or Ruby)というライブラリを利用することで簡単に(?)ワークフローを実現することができる凄いサービスなんです。
  • この記事はSWFのActivityにLambdaを使って幸せになろうぜという記事です。

  • Lambdaは人気があって記事もたくさんあるのですが、SWFは始めのとっつきにくさからか、周りに使ってる人が見つけられず、ネット上でも日本語の情報源が非常に少ないのです。なので少し普及に貢献してみようと思います。(普及に貢献したい人、一緒に勉強会でもしましょう)

SWFの基礎的な話

  • SWFの概要を理解するにはまずはこちらの資料をご覧ください。
  • この資料を見てわかるようにAWS SWFでは大きく4人の登場人物が出てきます。

  • Client(Executor) ・・・ workflowをキックする人です。オンプレミス側からSWFのEndpointに接続してワークフローを起動します。
  • SWF・・・SWFはクライアントからの実行の要求を受付けワークフローのキューの管理をしてくれます。
  • Desider・・・SWFのキュー(ドメイン・タスクリスト)を監視しながらワークフローを進める人です。ただタスクを管理するだけで実際の処理は行いません。SWFのエンドポイントからつながるのであればEC2でもよいですし、オンプレミスのサーバを使っても構いません。
  • Activity・・・Desiderから振られた処理を実行する人です。ステートレスに実行します。ActivityもSWFのエンドポイントからつながるのであればEC2でもよいですし、オンプレミスのサーバを使っても構いません。
  • Activityにはステートレスで、処理時間がかかり負荷がかかるものが実装されるため、実際に運用していくとなるとActivityをスケールアウトしていく運用になると思います。

さて本題

  • SWFを中心とした疎結合なアーキテクチャなので、Activityをスケールアウトしていくことで負荷に対応していくことができます。
  • 自動スケールアウトするにはActivityのインスタンスをAutoScalingGroupに登録させればよいでしょう。それすら面倒ならElasticBeansTalkにお任せしましょう。それすらも面倒だ、もっと安く運用したいんだったらLamdaなんじゃないかなと思うわけです。
  • Lambdaを使ったアーキテクチャは以下のようになります。
  • Activityの部分がLambdaになっただけですが、Activityの運用から解き放たれると思うとこんなに嬉しいことはありません。
  • 早速実装方法を見ていきます。(LambdaFunctionの登録は通常Lambdaを利用するときと変わらないので割愛します。)

シンプルに呼び出す

DesiderからLambdaを呼び出す方法

  • SWFに付属しているサンプルではLambdaFunctionへ呼びっぱなしですが、ここではLambdaを実行した結果を待つ形にしてみました。
    @Override
    public void hello(String name) throws Exception {

        task = new TryCatchFinally() {

            @Override
            protected void doTry() throws Throwable {
                ConfigHelper configHelper = ConfigHelper.createConfig();
                DecisionContextProvider decisionProvider =
                        new DecisionContextProviderImpl();

                DecisionContext decisionContext = decisionProvider.getDecisionContext();
                LambdaFunctionClient lambdaClient =
                        decisionContext.getLambdaFunctionClient();

                // lambdaの引数はjson
                Promise<String> val = lambdaClient.scheduleLambdaFunction(
                        configHelper.getSwfLambdaFunction(), "\"" + name + "\"", 30);
                // lambda functionの呼び出しが終わるまで待つ
                processResult(val);

            }

            @Override
            protected void doCatch(Throwable e) throws Throwable {
                logger.error("catch exception", e);
                // 例外処理
                throw e;
            }

            @Override
            protected void doFinally() throws Throwable {
                // noop
            }
        };
    }

    @Asynchronous
    private void processResult(Promise<String> lambdaClientResult) {
        System.out.println("ready[" + LocalTime.now() + "] lambda return value["
                + lambdaClientResult.get() + "]");
    }
}
  • scheduleLambdaFunctionの第一引数はLambdaFunction名、第二引数はLambdaFunctionへの引数(jsonのみ)、第三引数はtimeoutの時間になります。
  • LambdaFunctionを実行した結果がPromiseで取れますので、これをAsynchronousメソッドに渡すことでLambdaFunctionの実行を待って結果を出力することができます。

リトライ処理

自前のActivityの場合

  • 通常のActivityであればリトライするには以下のようにアノテーションをActivityに付与すれば簡単に実現できるわけです。
  • ちなみに以下の設定は5秒,10秒,20秒とリトライしてくれます。
@ExponentialRetry(initialRetryIntervalSeconds = 5, maximumAttempts = 3)
public String hello(String str);

LambdaFunctionをActivityにした場合

  • LambdaFunction側にアノテーションが使えないのでDesider側でなんとかする必要がありそうです。
  • ここで登場するのがExponentialRetryPolicyとRetryDecoratorです。
  • ExponentialRetryPolicyでポリシーを決めて、RetryDecorator経由LambdaFunctionClientのインスタンスを入手することで実現できます。
                long initialRetryIntervalSeconds = 5;
                int maximumAttempts = 5;
                List<Class<? extends Throwable>> exceptionsToRetry = new ArrayList<>();
                exceptionsToRetry.add(RuntimeException.class);
                ExponentialRetryPolicy retryPolicy =
                        new ExponentialRetryPolicy(initialRetryIntervalSeconds)
                                .withMaximumAttempts(maximumAttempts)
                                .withExceptionsToRetry(exceptionsToRetry);
                Decorator retryDecorator = new RetryDecorator(retryPolicy);
                LambdaFunctionClient decoratedLambdaClient =
                        retryDecorator.decorate(LambdaFunctionClient.class, lambdaClient);
  • わかれば簡単ですけど気づけないですよねー

300秒の壁

  • AWS Lambda の制限
  • ちょっと前に300秒までLambdaの実行時間が延びましたが300秒を超える処理はどうやって実現したらよいでしょうか。(いや、そのまえに300秒超える処理どうなの?は一旦置いておきましょう)
  • そういう場合は、処理を投げた後に結果をポーリングさせるようなアーキテクチャにすると幸せになります。
  • Desider側ではDecisionContextからWorkflowClockというのが取れまして、WorkflowClockからcreateTimerして得られたPromiseをdesiderのメソッドの引数に上げてあげれば良いです。
  • 以下の例はRdsのSnapshot取得のAPIを叩いた後、snapshotがavairableになるのを待って処理を動かすサンプルです。
  • Functorの引数にPromiseを渡すことでそこまで処理を待たせることができるのと、pargeRdsSnapshotAfterPoolingにtimerを渡している部分がポイントですね。
    @Asynchronous
    private Promise<String> pargeRdsSnapshotAfterPooling(Promise<String> createRdsSnapshotResult,
            Promise<?>... waitForPromises) {

        // 中略

        LambdaFunctionClient lambdaClient = decisionContext.getLambdaFunctionClient();
        Promise<String> getRdsSnapshotStatusResult =
                lambdaClient.scheduleLambdaFunction("getRdsSnapshotStatus", parameter, 60);

        return new Functor<String>(getRdsSnapshotStatusResult) {
            @Override
            protected Promise<String> doExecute() throws Throwable {
                String rdsSnapshotStatus =
                        CmpWorkflowUtils.getActivityResult(getRdsSnapshotStatusResult.get());
                if ("available".equals(rdsSnapshotStatus)) {
                    return pargeRdsSnapshot();
                }
                int retryPeriod = 10;
                Promise<Void> timer = clock.createTimer(retryPeriod);
                return pargeRdsSnapshotAfterPooling(createRdsSnapshotResult, timer);
            }
        };
    }

まとめ

  • ということでSWFのLambdaを使うとActivityの運用から解放されて幸せになれるかもね。という記事でした。
  • サンプルは以下に置いてあります。上記の他、LambdaをCronで実行するサンプルも作って見ました。 https://github.com/uzresk/aws-swf-lambda-samples