Firehoseで出力したファイルをAthenaのパーティション形式に変換するLambda
2019/02/18 追記
Firehose で出力先の prefix が指定できるようになりました!
これでこの Lambda もいらなくなりますね!素晴らしい。
概要
FirehoseはデフォルトでYYYY/MM/DD/HH
というプレフィクスを付けてS3にファイルを格納する。
このままでもAthenaでパーティションとして利用することはできるが、ディレクトリが作成されるたびにADD PARTITION
しなければならない。
大量データをクエリする場合はググればEMRを使ったカラムナフォーマットへの変換記事がたくさんヒットするけど、ライトにAthenaを使いたい場合に有効。
新しいパーティションを勝手には読み込んでくれないのでクエリ実行前に以下のSQLを実行する必要があります。冪等なので毎回実行すればOKです。
MSCK REPAIR TABLE database_name.table_name;
Lambda Function
Athenaの自動パーティション認識を有効にするために、Hiveパーティションに対応した形式のディレクトリにコピーするLambda関数を作った。
さらにFirehoseのYYYY/MM/DD/HH
はUTCのため、JST(+9:00)のYYYY-MM-DD/HH
に変更する。
'use strict';
process.env.TZ = 'Asia/Tokyo'; // Timezoneを明示的にJSTに
const aws = require('aws-sdk');
const s3client = new aws.S3({ apiVersion: '2006-03-01' });
const path = require('path');
const toBucket = process.env.TO_BUCKET; // Lambda Functionの環境変数
const toPrefix = process.env.TO_PREFIX; // Lambda Functionの環境変数
function getS3EventKey(s3obj) {
return decodeURIComponent(s3obj.object.key.replace(/\+/g, ' '));
}
function getCopySource(s3obj) {
const srcBucket = s3obj.bucket.name;
const srcKey = getS3EventKey(s3obj);
return `${srcBucket}/${srcKey}`;
}
function getTodayPartition() {
const today = new Date();
const year = today.getFullYear();
let month = today.getMonth() + 1;
let day = today.getDate();
let hour = today.getHours();
if (month < 10) month = "0" + month;
if (day < 10) day = "0" + day;
if (hour < 10) hour = "0" + hour;
// ここがキモ。dtとhourというパーティションが自動認識される
return `dt=${year}-${month}-${day}/hour=${hour}`;
}
function getDestKey(s3obj) {
const srcKey = getS3EventKey(s3obj);
const filename = path.basename(srcKey);
const todayPartition = getTodayPartition();
return `${toPrefix}${todayPartition}/${filename}`;
}
exports.handler = (event, context, callback) => {
const s3obj = event.Records[0].s3;
const copySrc = getCopySource(s3obj);
// ディレクトリは無視
if (copySrc.endsWith("/")) {
callback(null, `Skip, because ${copySrc} is directory.`);
return;
}
const destKey = getDestKey(s3obj);
const params = { CopySource: copySrc, Bucket: toBucket, Key: destKey };
console.log(`s3://${copySrc} copy to s3://${toBucket}/${destKey}`);
s3client.copyObject(params, (err, data) => {
if (err) {
console.log(err, err.stack);
callback(err);
} else {
console.log(data);
callback(null, data);
}
});
};
Firehoseが出力するS3バケットのCreate*イベントで実行するようにこのLambda関数を作成する。
このコードは東京リージョンには対応していません。
AWSの設定など
Lambdaの実行ロールに以下のポリシーを設定。(Listはいらんかもしれんけど念のため)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Enable S3 Read Permissions",
"Effect": "Allow",
"Action": [
"s3:List*",
"s3:Get*"
],
"Resource": [
"arn:aws:s3:::Firehoseが出力するバケット名",
"arn:aws:s3:::Firehoseが出力するバケット名/*"
]
},
{
"Sid": "Enable S3 Write Permissions",
"Effect": "Allow",
"Action": [
"s3:List*",
"s3:Put*"
],
"Resource": [
"arn:aws:s3:::Athenaが参照するバケット名",
"arn:aws:s3:::Athenaが参照するバケット名/*"
]
}
]
}
余談
FirehoseがDynamic Prefix対応してくれればこんなんしなくて済むんだけど。
参考リンク
Author And Source
この問題について(Firehoseで出力したファイルをAthenaのパーティション形式に変換するLambda), 我々は、より多くの情報をここで見つけました https://qiita.com/sonodar/items/19dbd8fe6740d26d054a著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .