APIGatewayのログのタイムスタンプをLambdaで加工してElasticsearch Serviceに送る


はじめに

APIGatewayのログをElasticsearch Serviceに送ってKibanaで可視化しようとしたときの手順。
ただ、kibanaで対応しているタイムスタンプの形式がAPIGatewayが出力するタイムスタンプの型と一致していなかった。
そのため、Lambdaでタイムスタンプの形式を変換してKibanaでタイムスタンプとして扱えるようにした。

構成

APIGatewayのアクセスログは、Kinesisにしか出力できないため、Kinesisを利用している。

手順

1.Lambdaの作成
2.Kinesis Data Firehoseの作成
3.APIGatewayでアクセスログを出力する設定追加
※Elasticsearch Serviceのドメインは作成済の前提

1.Lambdaの作成

Lamndaの環境変数にログレベルとして「LOG_LEVEL」を設定する必要がある。
下記は、ログレベルがデバッグの場合

タイムスタンプのキーは、APIGatewayで設定した「requestTime」。

lambda_function.py
import json
import base64
import logging
from os import environ
from datetime import datetime as dt
from datetime import timedelta, timezone as tz

# 環境変数ログレベルのキー
LOG_LEVEL = 'LOG_LEVEL'
# タイムスタンプのキー
REQUEST_TIME_KEY = 'requestTime'

# ログオブジェクト作成
def get_logger(workername):
    # ログの出力名を設定
    logger = logging.getLogger(workername)
    # ログレベルの設定
    logger.setLevel(int(environ[LOG_LEVEL]))

    return logger

# ロガー
logger = get_logger(__name__)

# タイムスタンプの形式をKibana用に変換
def conv_timestamp(timestamp):
    # date型に変換
    date = dt.strptime(timestamp, '%d/%b/%Y:%H:%M:%S +0000')
    # 9時間進める(力技なのでイマイチ・・・)
    date = date + timedelta(hours=+9)
    # String型に変換して返却
    return dt.strftime(date, '%Y-%m-%dT%H:%M:%S+0900')

def lambda_handler(event, context):
    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼event▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(event)
    output_list = []
    for record in event['records']:
        data = record['data']
        # デコード
        data_decode = base64.b64decode(data)
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換前)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換前)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

        # byte型をString型に変換
        data_str = data_decode.decode("ascii")
        # Json文字列を辞書型に変換
        data = json.loads(data_str)

        # タイムスタンプの形式をkibana用に変換
        data[REQUEST_TIME_KEY] = conv_timestamp(data[REQUEST_TIME_KEY])

        # 辞書型をjson文字列→base64エンコード→デコード
        record['data'] = base64.b64encode(json.dumps(data).encode('ascii')).decode("ascii")
        # resultを追加
        record['result'] = 'Ok'
        output_list.append(record)

        data_decode = base64.b64decode(record['data'])
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換後)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換後)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼返却するLIST▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(output_list)
    logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲返却するLIST▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    return {'records': output_list}

Kinesisにデータを戻すために、AWS管理ポリシーの「AWSLambdaKinesisExecutionRole」を追加

2.Kinesis Data Firehoseの作成

「Choose a Souce」は「Direct PUT or other sources」を選択する。

「record transformation」は「Enabled」を選択し、作成した加工用のLambdaを選択する。

「Amazon Elasticsearch Service destination」のドメインで作成済のドメインを選択する。
「Index」は任意の名前。
「Index rotation」、日ごとにする場合は、「Every day」を選択する。

また、Elasticsearch Serviceにログを送るためとLambdaを実行するために、下記の権限をポリシーで追加する。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:GetFunctionConfiguration"
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

以上の設定をして作成する。

3.APIGatewayでアクセスログを出力する設定追加

アクセスログを出力するAPIのステージ > ログトレースタブ
Access Log Destination ARN:1.で作成したKinesisのARN
ログの形式:JSONボタンを押下して設定

これで構成の図の通りに完成です。

最後に

Kinesisのサービスを今回初めて触りました。
なんか難しいのかと思っていましたが、データを溜め込んでくれる単純なサービスなんだと理解できました。
ただ東京リージョンでも全て英語なので、初学者のハードルを少し上げていると思いました。