AWS Step Functionsで大きいpayloadを渡す


lambdaからlambdaに渡して加工して渡す必要が出てきて「step functions」を使った。
タイトルとはかけ離れているが、記録を兼ねて。

まずは動画で基礎を学習

兎にも角にも基本的な用語や基本的な設定方法がわからないと話にならないので手っ取り早く動画を見る。
この動画でほぼ7割〜8割程度のやりたいことが出来るようになる。
https://www.youtube.com/watch?v=PGyasNJ1QTQ&t=3083s

しかし、この動画の中でペイロードが大きくなった際の対処方法が軽く説明されていて具体的な手法が分からなかったので調べてやってみた。

何をしようとしているのか

「2つの外部APIにリクエストを投げてそのレスポンスを受け取って、それぞれのレスポンスを合体させて加工する」 ということをやりたかった。

{
  "Comment": "Search Items",
  "StartAt": "lookupItems",
  "States": {
    "lookupItems" :{
      "Type": "Parallel",
      "Next": "Response-Item",
      "Branches": [
        {
          "StartAt": "searchItems",
          "States": {
            "searchRakutenItems": {
              "Type": "Task",
              "Parameters": {"keywords.$": "$.keywords"},
              "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXXXX:function:search_api-1_sf",
              "End": true
            }
          }
        },
        {
          "StartAt": "searchAmazonItems",
          "States": {
            "searchAmazonItems": {
              "Type": "Task",
              "Parameters": {"keywords.$": "$.keywords"},
              "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXXX:function:search_api-2_sf",
              "End": true
            }
          }
        }
      ]
    },
    "Response-Item": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXX:function:return-item-search",
      "End": true
    }
  }
}

Parallelの部分をマニュアルを適当に読み作った結果・・・

最初に次のステップに渡すために "End": true の部分を "Next": "Response-Item" としていて「定義」の画面でエラーが出てしまい悩んだ。

ここに書いてあるではないか。

各ブランチは自己完結型である必要があります。Parallel 状態の 1 つのブランチの状態にはそのブランチ外のフィールドをターゲットにする Next フィールドがあってはなりません。また、そのブランチ外の他の状態からこのブランチに移行することもできません。

でここは "End": true に修正した。で次のステップにはどう行くのか?という疑問が、、、調べると Branches の外に書けることが判明。

  "States": {
    "lookupItems" :{
      "Type": "Parallel",
      "Next": "Response-Item",

定義を作ってみた

これでこのように出来た

大きいペイロードを渡すことが出来ない

ところが!!step functionでlambdaを使用して外部APIからレスポンスを取得して渡した際に以下のようなエラーが出てしまった。

size exceeding the maximum number of characters service limit

このページを見ると
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/avoid-exec-failures.html

状態間でデータの大きいペイロードを渡す実行を終了できます。状態間で渡すデータが 32 KB を超える場合、Amazon Simple Storage Service (Amazon S3) を使用してデータを保存し、raw データの代わりに Amazon リソースネーム (ARN) を渡します。または、実行時に小さいペイロードを渡すように実装を調整します。

と記載がある。
「どうやってS3の情報を渡すんだ??」という肝心な部分が書いてない。結論としては
「function computeでS3に格納して、returnでバケット名とファイル名を渡す。次のlambdaでS3から読み込む。」というのが結論。当初step functionsの設定でできるかと思っていた自分。

lambdaの修正

これまで単純にlambdaでレスポンスを全て返すように設定していたがstep funtion移行に伴い以下のように変更した。
レスポンスごとにファイルを作成するが、万が一リクエストの時間が重なった場合に、ファイルを上書きしてリクエスト内容と違うレスポンスにならないようにタイムスタンプ(unixTime)を付与してファイル名を一意にした。今回は「秒単位」で分かれていれば十分だった。

ちなみにこの設定だとS3が膨れ上がるしのでS3の「ライフサイクル」で1日経過したらファイルを消すように設定した。次のステップのlambdaで読み込んだら、消すようにしてもいいかも。

最後にバケット名とファイル名をresponseとして返すようにした。

    #S3へのファイル出力
        ut = int(time.time()) #UnixTimeを算出
        bucket_name = "search-items"
        file_name = "files" + str(ut) + ".json"
        s3 = boto3.resource('s3')
        obj = s3.Object(bucket_name,json_key)

        response_json = amazon_response
        obj.put(Body = json.dumps(response_json))

        # s3にファイルを設置後keyを返す        
        return '{\"bucket_name\":\"' +bucket_name +'\" , \"filename\":\"' +file_name +'\"}'

実行した結果

「出力」にきちんとファイル名とバッケット名が記録されていた。

バケット名とファイル名を受け取る

図でいうと最後の[Response-Item]で出力されたデータを受け取る。この受け取ったデータは何も加工する必要がなければそのままlambdaのeventで受け取れる。
試しにResponse-Itemの中身を以下のコードだけ書いてみる

def lambda_handler(event, context):
    # TODO implement
    print(event)

するとCloudwatchlogには以下のようなログが出て、きちんと先程のoutputの内容が受け取れたことが確認できる。
どちらがどのファイル名かを判定するにはファイル名で判定できるかバケットを分けるかなど工夫すれば良い。

['
{
    "bucket_name": "search-items",
    "filename": "XXXXXXItem1597457186.json"
}
', '
{
    "bucket_name": "search-items",
    "filename": "YYYYYItem1597457201.json"
}
']

ファイルがS3で出来上がるまでWaitをかけた。

これは、後半のlambdaでtime.sleepかけるか、step function内で
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-wait-state.html
こちらでかけるのも良い。