Firehoseで出力したファイルをAthenaのパーティション形式に変換するLambda


2019/02/18 追記

Firehose で出力先の prefix が指定できるようになりました!

これでこの Lambda もいらなくなりますね!素晴らしい。

概要

FirehoseはデフォルトでYYYY/MM/DD/HHというプレフィクスを付けてS3にファイルを格納する。
このままでもAthenaでパーティションとして利用することはできるが、ディレクトリが作成されるたびにADD PARTITIONしなければならない。

Amazon Athenaのパーティションを理解する #reinvent

大量データをクエリする場合はググればEMRを使ったカラムナフォーマットへの変換記事がたくさんヒットするけど、ライトにAthenaを使いたい場合に有効。

新しいパーティションを勝手には読み込んでくれないのでクエリ実行前に以下のSQLを実行する必要があります。冪等なので毎回実行すればOKです。

MSCK REPAIR TABLE database_name.table_name;

Lambda Function

Athenaの自動パーティション認識を有効にするために、Hiveパーティションに対応した形式のディレクトリにコピーするLambda関数を作った。
さらにFirehoseのYYYY/MM/DD/HHはUTCのため、JST(+9:00)のYYYY-MM-DD/HHに変更する。

'use strict';
process.env.TZ = 'Asia/Tokyo'; // Timezoneを明示的にJSTに

const aws = require('aws-sdk');
const s3client = new aws.S3({ apiVersion: '2006-03-01' });
const path = require('path');

const toBucket = process.env.TO_BUCKET; // Lambda Functionの環境変数
const toPrefix = process.env.TO_PREFIX; // Lambda Functionの環境変数

function getS3EventKey(s3obj) {
    return decodeURIComponent(s3obj.object.key.replace(/\+/g, ' '));
}

function getCopySource(s3obj) {
    const srcBucket = s3obj.bucket.name;
    const srcKey = getS3EventKey(s3obj);
    return `${srcBucket}/${srcKey}`;
}

function getTodayPartition() {
    const today = new Date();
    const year = today.getFullYear();
    let month = today.getMonth() + 1;
    let day = today.getDate();
    let hour = today.getHours();
    if (month < 10) month = "0" + month;
    if (day < 10) day = "0" + day;
    if (hour < 10) hour = "0" + hour;
    // ここがキモ。dtとhourというパーティションが自動認識される
    return `dt=${year}-${month}-${day}/hour=${hour}`;
}

function getDestKey(s3obj) {
    const srcKey = getS3EventKey(s3obj);
    const filename = path.basename(srcKey);
    const todayPartition = getTodayPartition();
    return `${toPrefix}${todayPartition}/${filename}`;
}

exports.handler = (event, context, callback) => {
    const s3obj = event.Records[0].s3;
    const copySrc = getCopySource(s3obj);
    // ディレクトリは無視
    if (copySrc.endsWith("/")) {
        callback(null, `Skip, because ${copySrc} is directory.`);
        return;
    }
    const destKey = getDestKey(s3obj);
    const params = { CopySource: copySrc, Bucket: toBucket, Key: destKey };
    console.log(`s3://${copySrc} copy to s3://${toBucket}/${destKey}`);
    s3client.copyObject(params, (err, data) => {
        if (err) {
            console.log(err, err.stack);
            callback(err);
        } else {
            console.log(data);
            callback(null, data);
        }
    });
};

Firehoseが出力するS3バケットのCreate*イベントで実行するようにこのLambda関数を作成する。

このコードは東京リージョンには対応していません。

AWSの設定など

Lambdaの実行ロールに以下のポリシーを設定。(Listはいらんかもしれんけど念のため)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Enable S3 Read Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Get*"
      ],
      "Resource": [
        "arn:aws:s3:::Firehoseが出力するバケット名",
        "arn:aws:s3:::Firehoseが出力するバケット名/*"
      ]
    },
    {
      "Sid": "Enable S3 Write Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Put*"
      ],
      "Resource": [
        "arn:aws:s3:::Athenaが参照するバケット名",
        "arn:aws:s3:::Athenaが参照するバケット名/*"
      ]
    }
  ]
}

余談

FirehoseがDynamic Prefix対応してくれればこんなんしなくて済むんだけど。

参考リンク