AWS Lambda のタイムアウト発生時に別の Lambda 関数を起動し、元の Lambda 関数の処理内容を取得する


はじめに

こんにちは。はじめまして。
最近、AWS で本番稼働する時のことをかんがえるようになりました。
Lambda の負荷テストをしていると、タイムアウトが発生して「コマッタ…」となった時のことを書いてみます。

Lambda タイムアウト 15分問題

Lambda ってタイムアウト時間設定しなくちゃいけなくて、最大 15分しか設定できないんですよね。
(参考)
https://aws.amazon.com/jp/blogs/news/aws-lambda-timeout-15min/

で、15分以上処理しちゃうと、CloudWatch にタイムアウトのログを出すだけ… と。

まあ、Lambda に重たい処理をさせるな!ということでしょうけど、
もう作ってしまったし… なんとかできないものか…
せめて、ユーザに処理できなかったことを伝えられないものか…

対策

ちょっと調べてみると、色々と対策はある模様。

ただ、上記の方法だと、パラメータで渡される文字列によって処理が分岐するなど、
関数内でもアウトプットが複数パターンあると、どれで落ちたのかがわからず、
「どうしたらいいの?」ってなってしまった。

以下、ダラダラと経緯を書いていますが、結論から言うと、

  • タイムアウトが発生する Lambda 関数
    • タイムアウトが発生する処理にログを仕込みます。
    • ログにはその値を特定するキーワードと Lambda 処理を識別する RequestId を付けます。
  • タイムアウト後の Lambda 関数

以上。

環境

  • Python 3.8

とりあえず、タイムアウトを再現してみる

タイムアウトを 1秒とし、10秒待機する処理を書いてみました。

sampleTimeout
import time

def lambda_handler(event, context):

    # ここに出力される key を取得することを今回の目標とします。
    print("key:%s" % event.get('key', 'default'))
    # 10秒待機
    time.sleep(10)

    return {}

タイムアウト検出してみる

これは「タイムアウトでエラーとなってしまったLambdaのリカバリを行うLambdaを作成してみました」にも記述がありますが、
CloudWatch LogsTask timed out afterというログが出力されたら Lambda 関数を発火するよう設定します。

動作確認

sampleTimeout.py をとりあえず実行してみます。

テストデータは下記の通り。

{
  "key": "test1"
}

ログはこんな感じに出力されてます。

detectTimeout.py のログも確認してみましょう。

ちゃんと実行されてますね。

はて、どうするか ...
ロググループやログストリームは別処理で同じ所に出力されるときもあるし ....
その処理で特定できる何かキーワードはないモノか.....
最悪自分で発行するかな ......

ん?
ログに RequestId ってのがあるな。
ちょっと調べてみると、Lambda 関数コール時に生成されるユニークなキーっぽいですね。
lambda_handler の引数 context に含まれている模様。(Lambda 初心者まるだし)
参考

よし!やってみよう!

タイムアウトのログから RequestId を抜き取る

detectTimeout を書き換えます。

detectTimeout
import json
import base64
import gzip
import re

def lambda_handler(event, context):

    # ログデータは base64 にエンコードされた gzip データで取得できる。
    # それを解凍し、json オブジェクトに置き換える。
    payload = base64.b64decode(event['awslogs']['data'])
    log = json.loads(gzip.decompress(payload).decode('utf-8'))

    # requestID を取得 (UUID の正規表現にマッチした文字列を取得)
    reg = r'[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
    match = re.search(reg, log['logEvents'][0]['message'])
    if match:
        requestID = match.group(0)
    else:
        print('requestID が正常に取得できませんでした。')
        return

    print('request id : ' + requestID)

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

ログを確認すると、取れてますね!

タイムアウトする処理でタイムアウト後処理に渡したい値のログに RequestID を含める

sampleTimeout
import time

def lambda_handler(event, context):

    print("key:%s:%s" % (event.get('key', 'default'), context.aws_request_id))
    time.sleep(10)

    return {}
log
Function Logs:
START RequestId: 287d2b7a-9d61-40cd-9c4c-d6d6d6d2ba0d Version: $LATEST
key:test1:287d2b7a-9d61-40cd-9c4c-d6d6d6d2ba0d
END RequestId: 287d2b7a-9d61-40cd-9c4c-d6d6d6d2ba0d
REPORT RequestId: 287d2b7a-9d61-40cd-9c4c-d6d6d6d2ba0d  Duration: 1000.88 ms    Billed Duration: 1000 ms    Memory Size: 128 MB Max Memory Used: 56 MB  Init Duration: 133.96 ms    
2019-11-28T08:53:53.633Z 287d2b7a-9d61-40cd-9c4c-d6d6d6d2ba0d Task timed out after 1.00 seconds

requestID と key: を含むログを取得し、key の値を取得する

最後まで行きます!

detectTimeout
import json
import base64
import gzip
import re
import boto3

def lambda_handler(event, context):

    # ログデータは base64 にエンコードされた gzip データで取得できる。
    # それを解凍し、json オブジェクトに置き換える。
    payload = base64.b64decode(event['awslogs']['data'])
    log = json.loads(gzip.decompress(payload).decode('utf-8'))

    # requestID を取得 (UUID の正規表現にマッチした文字列を取得)
    reg = r'[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
    match = re.search(reg, log['logEvents'][0]['message'])
    if match:
        requestID = match.group(0)
    else:
        print('requestID が正常に取得できませんでした。')
        return

    print('request id : ' + requestID)

    # requestID と key: でフィルタし、対象メッセージを取得する。
    client = boto3.client('logs')

    logMsg = client.filter_log_events(
        logGroupName=log['logGroup'],
        logStreamNames=[log['logStream']],
        filterPattern='"' + requestID + '" "key:"'
    )
    msgs = logMsg.get('events', [{'message' : ''}])[0]['message'].split(':')
    if not len(msgs) == 3 : 
        print('key が正常に取得できませんでした。')
        return

    value = msgs[1]
    print('key is [%s] ' % value)

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

ログを確認。

log
[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the FilterLogEvents operation: User: arn:aws:sts::309122365079:assumed-role/detectTimeout-role-w1rjh1a2/detectTimeout is not authorized to perform: logs:FilterLogEvents on resource: arn:aws:logs:ap-northeast-1:309122365079:log-group:/aws/lambda/sampleTimeout:log-stream:
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 29, in lambda_handler
    logMsg = client.filter_log_events(
  File "/var/runtime/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)

ログを見る権限が足りない。 orz.

detectTimeout の Lambda に設定しているロールを編集

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:FilterLogEvents",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:ap-northeast-1:<AWSアカウントID>:*"
        }
    ]
}

こんどこそ…

でけた!
ただ、タイムアウトのリトライ数がデフォルトで2に設定されているので、次の処理が走ってしまってますね。
リトライ数を0にすることを忘れずに!(手順は省略)

最後に

これが本当に正しい手法か分かりませんが、タイムアウトが発生した処理のデータが取得できるようになりました。

やったことは、

  • タイムアウトが発生する Lambda 関数
    • タイムアウトが発生する処理にログを仕込みます。
    • ログにはその値を特定するキーワードと Lambda 処理を識別する RequestId を付けます。
  • タイムアウト後の Lambda 関数
    • トリガに CloudWatch Logs を設定
    • タイムアウトのログから RequestId を取得します。
    • タイムアウトのログと同じログストリームから RequestId と値を特定するキーワードでフィルタします。

です。
もっといい方法あったら教えてください。

以上、長々と失礼しました。