Amazon Athenaでクエリを実行するまで


cloudwatchへログを出力するlambdaを作る

exports.handler = async (event, context) => {
    console.log(JSON.stringify(event));
    const response = {
        statusCode: 200,
        body: JSON.stringify(event)
    };
    return response;
};

event内容

{
  "enqueteID": "enquete1",
  "questionID1": [
    "C"
  ],
  "questionID2": "B",
  "questionID3": "C",
  "questionID4": "E",
  "questionID5": "C",
  "questionID6": "E",
  "questionID7": "D",
  "questionID8": "A",
  "questionID9": "C",
  "questionID10": "B"
}

ログを格納するs3を用意する

バケットに必要なポリシーをつける

{
 "Version": "2012-10-17",
 "Statement": [
   {
     "Effect": "Allow",
     "Principal": {
       "Service": "logs.ap-northeast-1.amazonaws.com"
     },
     "Action": "s3:GetBucketAcl",
     "Resource": "arn:aws:s3:::{s3のバケット名}"
   },
   {
     "Effect": "Allow",
     "Principal": {
       "Service": "logs.ap-northeast-1.amazonaws.com"
     },
     "Action": "s3:PutObject",
     "Resource": "arn:aws:s3:::{s3のバケット名}/*",
     "Condition": {
       "StringEquals": {
         "s3:x-amz-acl": "bucket-owner-full-control"
       }
     }
   }
 ]
}

kinesis firehose作る

GUI上で作るなら
1. name決めて
2. ログを整形するか、フォーマットを変換するか
3. ログの出力先(s3のバケット)

作った後にやること
aws firehose describe-delivery-stream --delivery-stream-name test-hada
作成したfirehoseの状態を確認する

cloudwatch logsがkinesis data firehoseにデータを置く権限を付与するためのiamロールの作成

aws iam create-role \
    --role-name test-hada-kinesis-fire-hose-role \
    --assume-role-policy-document file://TrustPolicyForCWL.json

使ったポリシー(TrustPolicyForCWL.json)

{
    "Statement": {
        "Effect": "Allow",
        "Principal": {
            "Service": "logs.ap-northeast-1.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }
}

作成したroleにポリシー付与

aws iam put-role-policy \
    --role-name test-hada-kinesis-fire-hose-role \
    --policy-name test-hada-permissions-policy-for-CWL \
    --policy-document file://PermissionsForCWL.json

付与したポリシー(PermissionsForCWL.json)

{
    "Statement":[
        {
            "Effect":"Allow",
            "Action":["firehose:*"],
            // 作成したfirehoseのarn
            "Resource":["arn:aws:firehose:ap-northeast-1:{awsのアカウントID}:*"]
        },
        {
            "Effect":"Allow",
            "Action":["iam:PassRole"],
            // 作成したロールのarn
            "Resource":["arn:aws:iam::{awsのアカウントID}:role/test-hada-kinesis-fire-hose-role"]
        }
    ]
}

transformを使ってログを整形する

単純にlambdaでnode.jsからconsole.logするとこんな感じのログが出る

2019-11-29T07:27:36.068Z    b6898132-70ce-4271-a476-f89248c91c88    INFO
{
    "enqueteID": "enquete1",
    "questionID1": [
        "D",
        "E"
    ],
    "questionID2": "A",
    "questionID3": "B",
    "questionID4": "D",
    "questionID5": "C",
    "questionID6": "B",
    "questionID7": "E",
    "questionID8": "B",
    "questionID9": "A",
    "questionID10": "D"
}

2019-11-29T07:27:36.068Z b6898132-70ce-4271-a476-f89248c91c88 INFO
athenaを使うためには↑の行がいらないので正規表現使って整形
kinesis firehoseのtransformを使うとs3にログをアップロードする前にいい感じに整形するためのコードをlambdaで書ける
cloudwatchlogからkinesisにくるまでにログ内容がbase64でエンコードされてるっぽいので
一回デコードして、整形したのちにもう一度エンコードしてる

const zlib = require('zlib');

exports.handler = (event, context, callback) => {
  const output = event.records.map((record) => {
    const buf = zlib.gunzipSync(Buffer.from(record.data, 'base64'));
    const cwlogs = buf.toString('utf-8');
    const cwlogsparsed = JSON.parse(cwlogs);
    let ret = '';

    for (let i = 0; i < cwlogsparsed.logEvents.length; i += 1) {
      ret += `${cwlogsparsed.logEvents[i].message}\n`;
    }

    // NOTE: { の前の文字列を削除 その後無駄な改行を削除してる
    const replacedString = ret.replace(/^.*?{/, '{');
    const deletedLine = replacedString.replace(/\r\n|\n|\r/g, '');

    return {
      recordId: record.recordId,
      result: 'Ok',
      data: Buffer.from(`${deletedLine}\n`, 'utf8').toString('base64'),
    };
  });
  callback(null, { records: output });
};

partitionの設定ができるようにs3へ出力する

kinesis firehoseでAmazon S3 destinationのPrefixを下記にする
logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/

〇〇=の形でディレクトリ切らないとathenaでパーティションが切れないので要注意

サブスクリプションフィルタを作成する

lambdaのlogからINFOレベルのログのみをkinesis firehoseへ渡すための設定

aws logs put-subscription-filter \
    --log-group-name "/aws/lambda/{lambdaのfunction name}" \
    --filter-name "test-hada-filter" \
    --filter-pattern "INFO" \
    # firehoseのarn
    --destination-arn "arn:aws:firehose:ap-northeast-1:{awsのアカウントID}:deliverystream/test-hada" \
    # cloudwatch logsがkinesis data firehoseにデータを置く権限を持ったロールのarn
    --role-arn "arn:aws:iam::{awsのアカウントID}:role/test-hada-kinesis-fire-hose-role"

athena作成

tableの作成
カラム名はjsonのプロパティ名と同一にすること

CREATE EXTERNAL TABLE IF NOT EXISTS test.test_survey (
  `enqueteID` string,
  `questionID1` array<string>,
  `questionID2` string
) PARTITIONED BY (
  year int,
  month int,
  day int
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://{s3のバケット名}/logs/'
TBLPROPERTIES ('has_encrypted_data'='false');

パーティションの設定

test_survey MSCK REPAIR TABLE test_survey;

出力結果の例

enqueteid questionid1 questionid2 year month day
enquete1 [D, E] A 2019 11 29
enquete1 [C] B 2019 11 29

aws-cli経由でathenaのクエリ実行

# クエリの実行 タスクIDが返ってくる
aws athena start-query-execution \
--query-string "SELECT * FROM test.test_survey;" \
--result-configuration OutputLocation=s3://{バケット名}/

# タスクIDを使ってタスクの実行状況を確認
aws athena get-query-execution --query-execution-id {タスクID}

# タスクIDを使って結果の取得
aws athena get-query-results --query-execution-id {タスクID}