AWS Step Functionsで大きいpayloadを渡す
lambdaからlambdaに渡して加工して渡す必要が出てきて「step functions」を使った。
タイトルとはかけ離れているが、記録を兼ねて。
まずは動画で基礎を学習
兎にも角にも基本的な用語や基本的な設定方法がわからないと話にならないので手っ取り早く動画を見る。
この動画でほぼ7割〜8割程度のやりたいことが出来るようになる。
https://www.youtube.com/watch?v=PGyasNJ1QTQ&t=3083s
しかし、この動画の中でペイロードが大きくなった際の対処方法が軽く説明されていて具体的な手法が分からなかったので調べてやってみた。
何をしようとしているのか
「2つの外部APIにリクエストを投げてそのレスポンスを受け取って、それぞれのレスポンスを合体させて加工する」 ということをやりたかった。
{
"Comment": "Search Items",
"StartAt": "lookupItems",
"States": {
"lookupItems" :{
"Type": "Parallel",
"Next": "Response-Item",
"Branches": [
{
"StartAt": "searchItems",
"States": {
"searchRakutenItems": {
"Type": "Task",
"Parameters": {"keywords.$": "$.keywords"},
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXXXX:function:search_api-1_sf",
"End": true
}
}
},
{
"StartAt": "searchAmazonItems",
"States": {
"searchAmazonItems": {
"Type": "Task",
"Parameters": {"keywords.$": "$.keywords"},
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXXX:function:search_api-2_sf",
"End": true
}
}
}
]
},
"Response-Item": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXX:function:return-item-search",
"End": true
}
}
}
Parallelの部分をマニュアルを適当に読み作った結果・・・
最初に次のステップに渡すために "End": true
の部分を "Next": "Response-Item"
としていて「定義」の画面でエラーが出てしまい悩んだ。
ここに書いてあるではないか。
各ブランチは自己完結型である必要があります。Parallel 状態の 1 つのブランチの状態にはそのブランチ外のフィールドをターゲットにする Next フィールドがあってはなりません。また、そのブランチ外の他の状態からこのブランチに移行することもできません。
でここは "End": true
に修正した。で次のステップにはどう行くのか?という疑問が、、、調べると Branches
の外に書けることが判明。
"States": {
"lookupItems" :{
"Type": "Parallel",
"Next": "Response-Item",
定義を作ってみた
大きいペイロードを渡すことが出来ない
ところが!!step functionでlambdaを使用して外部APIからレスポンスを取得して渡した際に以下のようなエラーが出てしまった。
size exceeding the maximum number of characters service limit
このページを見ると
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/avoid-exec-failures.html
状態間でデータの大きいペイロードを渡す実行を終了できます。状態間で渡すデータが 32 KB を超える場合、Amazon Simple Storage Service (Amazon S3) を使用してデータを保存し、raw データの代わりに Amazon リソースネーム (ARN) を渡します。または、実行時に小さいペイロードを渡すように実装を調整します。
と記載がある。
「どうやってS3の情報を渡すんだ??」という肝心な部分が書いてない。結論としては
「function computeでS3に格納して、returnでバケット名とファイル名を渡す。次のlambdaでS3から読み込む。」というのが結論。当初step functionsの設定でできるかと思っていた自分。
lambdaの修正
これまで単純にlambdaでレスポンスを全て返すように設定していたがstep funtion移行に伴い以下のように変更した。
レスポンスごとにファイルを作成するが、万が一リクエストの時間が重なった場合に、ファイルを上書きしてリクエスト内容と違うレスポンスにならないようにタイムスタンプ(unixTime)を付与してファイル名を一意にした。今回は「秒単位」で分かれていれば十分だった。
ちなみにこの設定だとS3が膨れ上がるしのでS3の「ライフサイクル」で1日経過したらファイルを消すように設定した。次のステップのlambdaで読み込んだら、消すようにしてもいいかも。
最後にバケット名とファイル名をresponseとして返すようにした。
#S3へのファイル出力
ut = int(time.time()) #UnixTimeを算出
bucket_name = "search-items"
file_name = "files" + str(ut) + ".json"
s3 = boto3.resource('s3')
obj = s3.Object(bucket_name,json_key)
response_json = amazon_response
obj.put(Body = json.dumps(response_json))
# s3にファイルを設置後keyを返す
return '{\"bucket_name\":\"' +bucket_name +'\" , \"filename\":\"' +file_name +'\"}'
実行した結果
「出力」にきちんとファイル名とバッケット名が記録されていた。
バケット名とファイル名を受け取る
図でいうと最後の[Response-Item]で出力されたデータを受け取る。この受け取ったデータは何も加工する必要がなければそのままlambdaのeventで受け取れる。
試しにResponse-Itemの中身を以下のコードだけ書いてみる
def lambda_handler(event, context):
# TODO implement
print(event)
するとCloudwatchlogには以下のようなログが出て、きちんと先程のoutputの内容が受け取れたことが確認できる。
どちらがどのファイル名かを判定するにはファイル名で判定できるかバケットを分けるかなど工夫すれば良い。
['
{
"bucket_name": "search-items",
"filename": "XXXXXXItem1597457186.json"
}
', '
{
"bucket_name": "search-items",
"filename": "YYYYYItem1597457201.json"
}
']
ファイルがS3で出来上がるまでWaitをかけた。
これは、後半のlambdaでtime.sleep
かけるか、step function
内で
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-wait-state.html
こちらでかけるのも良い。
Author And Source
この問題について(AWS Step Functionsで大きいpayloadを渡す), 我々は、より多くの情報をここで見つけました https://qiita.com/Macaron_Suke/items/faba4616a5378b5d571d著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .