APIGatewayのログのタイムスタンプをLambdaで加工してElasticsearch Serviceに送る
はじめに
APIGatewayのログをElasticsearch Serviceに送ってKibanaで可視化しようとしたときの手順。
ただ、kibanaで対応しているタイムスタンプの形式がAPIGatewayが出力するタイムスタンプの型と一致していなかった。
そのため、Lambdaでタイムスタンプの形式を変換してKibanaでタイムスタンプとして扱えるようにした。
構成
APIGatewayのアクセスログは、Kinesisにしか出力できないため、Kinesisを利用している。
手順
1.Lambdaの作成
2.Kinesis Data Firehoseの作成
3.APIGatewayでアクセスログを出力する設定追加
※Elasticsearch Serviceのドメインは作成済の前提
1.Lambdaの作成
Lamndaの環境変数にログレベルとして「LOG_LEVEL」を設定する必要がある。
下記は、ログレベルがデバッグの場合
タイムスタンプのキーは、APIGatewayで設定した「requestTime」。
import json
import base64
import logging
from os import environ
from datetime import datetime as dt
from datetime import timedelta, timezone as tz
# 環境変数ログレベルのキー
LOG_LEVEL = 'LOG_LEVEL'
# タイムスタンプのキー
REQUEST_TIME_KEY = 'requestTime'
# ログオブジェクト作成
def get_logger(workername):
# ログの出力名を設定
logger = logging.getLogger(workername)
# ログレベルの設定
logger.setLevel(int(environ[LOG_LEVEL]))
return logger
# ロガー
logger = get_logger(__name__)
# タイムスタンプの形式をKibana用に変換
def conv_timestamp(timestamp):
# date型に変換
date = dt.strptime(timestamp, '%d/%b/%Y:%H:%M:%S +0000')
# 9時間進める(力技なのでイマイチ・・・)
date = date + timedelta(hours=+9)
# String型に変換して返却
return dt.strftime(date, '%Y-%m-%dT%H:%M:%S+0900')
def lambda_handler(event, context):
logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼event▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
logger.debug(event)
output_list = []
for record in event['records']:
data = record['data']
# デコード
data_decode = base64.b64decode(data)
logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換前)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
logger.debug(data_decode)
logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換前)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')
# byte型をString型に変換
data_str = data_decode.decode("ascii")
# Json文字列を辞書型に変換
data = json.loads(data_str)
# タイムスタンプの形式をkibana用に変換
data[REQUEST_TIME_KEY] = conv_timestamp(data[REQUEST_TIME_KEY])
# 辞書型をjson文字列→base64エンコード→デコード
record['data'] = base64.b64encode(json.dumps(data).encode('ascii')).decode("ascii")
# resultを追加
record['result'] = 'Ok'
output_list.append(record)
data_decode = base64.b64decode(record['data'])
logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換後)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
logger.debug(data_decode)
logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換後)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')
logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼返却するLIST▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
logger.debug(output_list)
logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲返却するLIST▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')
return {'records': output_list}
Kinesisにデータを戻すために、AWS管理ポリシーの「AWSLambdaKinesisExecutionRole」を追加
2.Kinesis Data Firehoseの作成
「Choose a Souce」は「Direct PUT or other sources」を選択する。
「record transformation」は「Enabled」を選択し、作成した加工用のLambdaを選択する。
「Amazon Elasticsearch Service destination」のドメインで作成済のドメインを選択する。
「Index」は任意の名前。
「Index rotation」、日ごとにする場合は、「Every day」を選択する。
また、Elasticsearch Serviceにログを送るためとLambdaを実行するために、下記の権限をポリシーで追加する。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"es:ESHttpPost",
"es:ESHttpPut"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"lambda:GetFunctionConfiguration"
"lambda:InvokeFunction"
],
"Resource": [
"*"
]
}
]
}
以上の設定をして作成する。
3.APIGatewayでアクセスログを出力する設定追加
アクセスログを出力するAPIのステージ > ログトレースタブ
Access Log Destination ARN:1.で作成したKinesisのARN
ログの形式:JSONボタンを押下して設定
これで構成の図の通りに完成です。
最後に
Kinesisのサービスを今回初めて触りました。
なんか難しいのかと思っていましたが、データを溜め込んでくれる単純なサービスなんだと理解できました。
ただ東京リージョンでも全て英語なので、初学者のハードルを少し上げていると思いました。
Author And Source
この問題について(APIGatewayのログのタイムスタンプをLambdaで加工してElasticsearch Serviceに送る), 我々は、より多くの情報をここで見つけました https://qiita.com/yamamoto_y/items/3cbb32a579fac89e2f6e著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .