AthenaのCTAS (CREATE TABLE AS SELECT) でETLをするTips


はじめに

この記事はAWS Advent Calendar 2018の3日目の記事です。

今年の10月にAthenaがCTAS(CREATE TABLE AS SELECT)をサポートしました。
CTASサポート以前のAthenaではクエリの結果を無圧縮のCSVでしか残せなかったのですが、CTASを使うと結果を列指向やJSONなどのフォーマットにしたうえ圧縮をかけて残せるようになりました。
これによりAthenaを使ったデータ加工の芽が出たのでいくつかのデータ加工バッチをAthenaでできるか検討しました。
この記事ではAthenaのCTASを使ったバッチを作る際のTipsについて紹介します。

前提

AthenaでETLしたい理由

AthenaはETL無しでS3のデータに直接分析をかけれるよ、というコンセプトを謳っており、現状ETLをするために設計されたサービスな訳ではないように感じています。しかし、それでもAthenaでETLしたい(出来たら嬉しい)理由は以下のような感じです。

  • 使うのがめっちゃ楽なフルマネージドサービス
  • Presto早い
  • Prestoの関数に便利な機能多い
  • スキャンしたデータ量に応じた課金なので小規模から始めやすいし、安く使える
  • S3のファイルを直接加工するための選択肢として手頃

CTASの使い方

結果を残したいクエリの先頭にCREATE TABLE table_name WITH ( property_name = expression ) ASをつけるだけです。
例えば以下のようなクエリを実行すればexampleデータベースにsample_dataテーブルが作成されます。
そしてその実データはs3://output-bucket/の配下にPARQUET形式で保存されます。

CREATE TABLE example.sample_data
WITH (
  format='PARQUET',
  external_location='s3://output-bucket/'
) AS
SELECT * FROM (
    VALUES
        (1, 'a'),
        (2, 'b'),
        (3, 'c')
) AS t (id, name)

CTASでできること/できないこと

CTASでできようになったこと、未だにできないことを以下に挙げます。

できるようになったこと

  • クエリの結果を新規テーブルとして保存
  • S3にフォーマットを指定して保存する
    • Parquet/ORC/JSON
  • 保存するデータの圧縮
  • 保存するデータのバケッティング
  • 保存するデータのパーティショニング

未だにできないこと

  • データの上書き
  • データの追記
    • CTASで書き込む先のS3のパスは空である必要があります
  • S3にデータだけ書き込む
    • 必ずテーブルを作る必要があります
  • Avroでデータを保存する

やはり上書きとか追記に対応して無いのがちょっとツラいですね。上書きできないのはPrestoもそうなので仕方ないですが。

CTASでETLするためのTips

ここからはCTASでETLするための具体的なTipsに移ります。

INSET OVERWRITEっぽいことを実現する

基本的に処理は冪等にしたいので、データの更新はなるべく追記ではなく上書きで実現したいです。
以下はCTASでINSERT OVERWRITEっぽいことを実現する方法です。

例えば日別のデータを保存したい場合は以下のような日付パーティションを付与した構造になると思います。

s3://sample-buket/day=yyyymmdd/
CREATE EXTERNAL TABLE sample_table(
  data STRING)
PARTITIONED BY ( 
  day STRING)
STORED AS PARQUET
LOCATION
  's3://sample-buket/'
tblproperties ("parquet.compress"="SNAPPY");

ここではパーティションに更にrevisionというカラムを加えます。

s3://sample-buket/day=yyyymmdd/revision=timestamp/
CREATE EXTERNAL TABLE sample_table (
  data STRING)
PARTITIONED BY ( 
  day STRING,
  revision STRING)
STORED AS PARQUET
LOCATION
  's3://sample-buket/'
tblproperties ("parquet.compress"="SNAPPY");

データの更新時は以下のようにバッチ実行時のtimestampをrevisionに指定してCTASによりデータを生成します。

CREATE TABLE temp_database.tmp_1543818683
WITH (
  format='PARQUET',
  external_location='s3://sample-buket/day=20181203/revision=1543818683/'
) AS
-- 何らかSQLによる加工を入れる
SELECT
  data
FROM
  input_table;

その後、revision=latestというailiasとして作るパーティションが常に最新のrevisionのパーティションを見るように変更します。

-- 初回はADD PARTITION
ALTER TABLE sample_table ADD IF NOT EXISTS
    PARTITION (day='20181203', revision='latest')
    LOCATION 's3://sample-buket/day=20181203/revision=1543818683/';
-- 変更の際はSET LOCATION
ALTER TABLE sample_table PARTITION (day='20181203', revision='latest')
    SET LOCATION 's3://sample-buket/day=20181203/revision=1543818683/';

CTASで作ったテーブルはいらないので消しときましょう。

DROP TABLE IF EXISTS temp_database.tmp_1543818683;

これによりrevision='latest'のパーティションを見ればデータが常に上書きされているようなテーブルを実現できます。
ただしこの方法ではMSCK REPAIR TABLEでパーティションをリカバリできないので注意が必要です。

出力ファイルの生成数を調整する

Athenaでデータをサマって結果を他のDB(Dynamoとか)に入れる場合は結果をJSONなど他のプログラムで扱いやすい形式にして出力するケースが多々あると思います。そういった場合に出力するファイル数を調節する方法です。

データを一つにまとめる

Bucketingを使えば特定のカラムの値をハッシュキーとして出力ファイルを分散することができます。
この際、出力するファイル数も指定することができるのでこの数を1にすることでファイルを一つにまとめることができます。

CREATE TABLE temp_database.tmp_1543818683
WITH (
  format='JSON',
  external_location='s3://output-bucket/revision=1543818683/',
  -- Bucketingのキーで使うカラム
  bucketed_by = ARRAY['id'],
  -- 出力するファイルの数
  bucket_count = 1
) AS
SELECT * FROM (
    VALUES
        (1, 'a'),
        (2, 'b'),
        (3, 'c')
) AS t (id, name)

結果

s3://output-bucket/revision=1543818683/20181203_070050_00280_qmtgu_bucket-00000.gz
解凍結果.json
{"id":1,"name":"a"}
{"id":2,"name":"b"}
{"id":3,"name":"c"}

データを特定のカラム別に分けて出力する

データを特定のカラムをもとに分けて出力したいのであればパーティショニングが使えます。
パーティショニングは特定のカラムをもとに出力パスを分けることが可能です。
この際パーティションに指定したカラムは元データから消えてしまうので、元データに残したい場合は重複してSELECTするようにしておきましょう。

CREATE TABLE temp_database.tmp_1543818683
WITH (
  format='JSON',
  external_location='s3://output-bucket/revision=1543818684/',
  -- Bucketingのキーで使うカラム
  bucketed_by = ARRAY['id'],
  -- 出力するファイルの数
  bucket_count = 1,
  -- partitioningに使うカラム
  partitioned_by = ARRAY['partition_id']
) AS
SELECT
  id,
  name ,
  -- パーティションに指定するカラムは末尾に書く
  id AS partition_id,
FROM (
    VALUES
        (1, 'a'),
        (2, 'b'),
        (3, 'c')
) AS t (id, name)

結果

s3://output-bucket/revision=1543818684/partition_id=1/20181203_070526_00306_ce5f8_bucket-00000.gz
s3://output-bucket/revision=1543818684/partition_id=2/20181203_070526_00306_ce5f8_bucket-00000.gz
s3://output-bucket/revision=1543818684/partition_id=3/20181203_070526_00306_ce5f8_bucket-00000.gz
partition_id=1のjson
{"id":1,"name":"a"}
partition_id=2のjson
{"id":2,"name":"b"}
partition_id=3のjson
{"id":3,"name":"c"}

QueryLimitに引っかからないように使う

Tipsというより注意点なんですが、現状Athenaの同時実行の制限は20クエリまでなのでLimitに引っかからないようにする必要があります。なので細かいクエリをたくさん投げるのには不向きです。Dailyの処理や少数のHourlyバッチで足りるくらいなら良いかもしれません。あるいはLimitに引っかかっても良いように処理は冪等かつリトライ可能にしておいて、自動でリトライするようにしておきましょう。
LambdaからAhenaのクエリを投げるようにしてStep Functionと組み合わせるとバックオフ込みのリトライが簡単に実現できます。

Athenaに限界を感じたら

とはいえ、やはりAthena単独でETLを回すのは現状では厳しいのでAWS内だと以下のサービスと併用する必要がありそうです。

  • Glue Jobを使う
  • EMRを使う
    • Prestoにクエリを投げることに重きを置くなら
    • クラスタの管理がある程度は発生してしまうので面倒

終わりに

以上、AthenaのCTASでETLする際のTipsでした。データの規模にもよりますが、現状だとAthenaのCTASにETLを寄せまくれる感じでは無いので、Insert対応が来たり、同時実行数の増加が来たりしてAthenaがフルマネージドなSQLでETLできるサービスになる日を待ってます。