Amazon Athenaでクエリを実行するまで
cloudwatchへログを出力するlambdaを作る
exports.handler = async (event, context) => {
console.log(JSON.stringify(event));
const response = {
statusCode: 200,
body: JSON.stringify(event)
};
return response;
};
event内容
{
"enqueteID": "enquete1",
"questionID1": [
"C"
],
"questionID2": "B",
"questionID3": "C",
"questionID4": "E",
"questionID5": "C",
"questionID6": "E",
"questionID7": "D",
"questionID8": "A",
"questionID9": "C",
"questionID10": "B"
}
ログを格納するs3を用意する
バケットに必要なポリシーをつける
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::{s3のバケット名}"
},
{
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::{s3のバケット名}/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control"
}
}
}
]
}
kinesis firehose作る
GUI上で作るなら
1. name決めて
2. ログを整形するか、フォーマットを変換するか
3. ログの出力先(s3のバケット)
作った後にやること
aws firehose describe-delivery-stream --delivery-stream-name test-hada
作成したfirehoseの状態を確認する
cloudwatch logsがkinesis data firehoseにデータを置く権限を付与するためのiamロールの作成
aws iam create-role \
--role-name test-hada-kinesis-fire-hose-role \
--assume-role-policy-document file://TrustPolicyForCWL.json
使ったポリシー(TrustPolicyForCWL.json)
{
"Statement": {
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
}
作成したroleにポリシー付与
aws iam put-role-policy \
--role-name test-hada-kinesis-fire-hose-role \
--policy-name test-hada-permissions-policy-for-CWL \
--policy-document file://PermissionsForCWL.json
付与したポリシー(PermissionsForCWL.json)
{
"Statement":[
{
"Effect":"Allow",
"Action":["firehose:*"],
// 作成したfirehoseのarn
"Resource":["arn:aws:firehose:ap-northeast-1:{awsのアカウントID}:*"]
},
{
"Effect":"Allow",
"Action":["iam:PassRole"],
// 作成したロールのarn
"Resource":["arn:aws:iam::{awsのアカウントID}:role/test-hada-kinesis-fire-hose-role"]
}
]
}
transformを使ってログを整形する
単純にlambdaでnode.jsからconsole.logするとこんな感じのログが出る
2019-11-29T07:27:36.068Z b6898132-70ce-4271-a476-f89248c91c88 INFO
{
"enqueteID": "enquete1",
"questionID1": [
"D",
"E"
],
"questionID2": "A",
"questionID3": "B",
"questionID4": "D",
"questionID5": "C",
"questionID6": "B",
"questionID7": "E",
"questionID8": "B",
"questionID9": "A",
"questionID10": "D"
}
2019-11-29T07:27:36.068Z b6898132-70ce-4271-a476-f89248c91c88 INFO
athenaを使うためには↑の行がいらないので正規表現使って整形
kinesis firehoseのtransformを使うとs3にログをアップロードする前にいい感じに整形するためのコードをlambdaで書ける
cloudwatchlogからkinesisにくるまでにログ内容がbase64でエンコードされてるっぽいので
一回デコードして、整形したのちにもう一度エンコードしてる
const zlib = require('zlib');
exports.handler = (event, context, callback) => {
const output = event.records.map((record) => {
const buf = zlib.gunzipSync(Buffer.from(record.data, 'base64'));
const cwlogs = buf.toString('utf-8');
const cwlogsparsed = JSON.parse(cwlogs);
let ret = '';
for (let i = 0; i < cwlogsparsed.logEvents.length; i += 1) {
ret += `${cwlogsparsed.logEvents[i].message}\n`;
}
// NOTE: { の前の文字列を削除 その後無駄な改行を削除してる
const replacedString = ret.replace(/^.*?{/, '{');
const deletedLine = replacedString.replace(/\r\n|\n|\r/g, '');
return {
recordId: record.recordId,
result: 'Ok',
data: Buffer.from(`${deletedLine}\n`, 'utf8').toString('base64'),
};
});
callback(null, { records: output });
};
partitionの設定ができるようにs3へ出力する
kinesis firehoseでAmazon S3 destinationのPrefixを下記にする
logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
〇〇=
の形でディレクトリ切らないとathenaでパーティションが切れないので要注意
サブスクリプションフィルタを作成する
lambdaのlogからINFOレベルのログのみをkinesis firehoseへ渡すための設定
aws logs put-subscription-filter \
--log-group-name "/aws/lambda/{lambdaのfunction name}" \
--filter-name "test-hada-filter" \
--filter-pattern "INFO" \
# firehoseのarn
--destination-arn "arn:aws:firehose:ap-northeast-1:{awsのアカウントID}:deliverystream/test-hada" \
# cloudwatch logsがkinesis data firehoseにデータを置く権限を持ったロールのarn
--role-arn "arn:aws:iam::{awsのアカウントID}:role/test-hada-kinesis-fire-hose-role"
athena作成
tableの作成
カラム名はjsonのプロパティ名と同一にすること
CREATE EXTERNAL TABLE IF NOT EXISTS test.test_survey (
`enqueteID` string,
`questionID1` array<string>,
`questionID2` string
) PARTITIONED BY (
year int,
month int,
day int
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://{s3のバケット名}/logs/'
TBLPROPERTIES ('has_encrypted_data'='false');
パーティションの設定
test_survey MSCK REPAIR TABLE test_survey;
出力結果の例
enqueteid | questionid1 | questionid2 | year | month | day |
---|---|---|---|---|---|
enquete1 | [D, E] | A | 2019 | 11 | 29 |
enquete1 | [C] | B | 2019 | 11 | 29 |
aws-cli経由でathenaのクエリ実行
# クエリの実行 タスクIDが返ってくる
aws athena start-query-execution \
--query-string "SELECT * FROM test.test_survey;" \
--result-configuration OutputLocation=s3://{バケット名}/
# タスクIDを使ってタスクの実行状況を確認
aws athena get-query-execution --query-execution-id {タスクID}
# タスクIDを使って結果の取得
aws athena get-query-results --query-execution-id {タスクID}
Author And Source
この問題について(Amazon Athenaでクエリを実行するまで), 我々は、より多くの情報をここで見つけました https://qiita.com/yotahada-nus3/items/c1b9771ee1b855595eed著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .