AWS Athenaで結果を強引にJSONで受け取る方法


追記

微妙に最新の情報を見逃していてAthenaはCTAS(CREATE TABLE AS)をサポートできるようになってたらしいです。
https://dev.classmethod.jp/cloud/aws/amazon-athena-support-ctas/

ということで以下のワークアラウンドはほぼ不要です。

tl;dr

  • Athenaの実行結果をJSONで受け取りたかった
  • SQLですべてのカラムを一つのJSONにキャストすればJSONで受け取れる
  • 本格的にETLするならGlueジョブを使いましょう

はじめに

AWS AthenaはマネージドPrestoとも言えるサービスでクラスタを管理しなくても手軽にS3のデータにクエリを投げれる良いサービスです。
一方でAthenaはINSERT文に対応していなかったり、クエリの出力形式がCSVかつ無圧縮しか選択できなかったりと、なんらかETLをするには不向きなサービスです(2018/10/15時点)。本格的なETLにはGlueを使うのがいいんですが、整形されたデータをチョロチョロっとサマって他のプログラムの入力にしたいみたいケースでは手軽にAthenaで済ませたくなります。そういった際に出力をプログラムで扱いやすいJSON形式でできるとなお嬉しいです。

ということで以下のようなクエリを書いてみました。

クエリ

こんな感じで出力結果をMAP経由でJSONにキャストすると結果を一つのJSON形式にできます。(サブクエリの中は適当なサンプルデータです。)

SELECT
  CAST( MAP(
    ARRAY[
      'name',
      'age',
      'skils',
      'account'
    ],
    ARRAY[
      CAST(name as JSON),
      CAST(age as JSON),
      CAST(skils as JSON),
      CAST(account as JSON)
    ]
  ) AS JSON) AS json_data
FROM (
  SELECT
    'kanga' as name,
    100 as age,
    ARRAY['sql','aws'] as skils,
    MAP(
      ARRAY['qiita', 'twitter'],
      ARRAY['kanga', 'kanga333']
    ) as account
);

こんな感じに結果が受け取れます。

json.csv
"json_data"
"{""account"":{""qiita"":""kanga"",""twitter"":""kanga333""},""age"":100,""name"":""kanga"",""skils"":[""sql"",""aws""]}"

厳密には出力はJSON形式では無く、1カラムのCSVの文字列の中にJSONが入ってる感じになります。

利用する際は文字列からデコードする必要があります。

直接csvをロードする場合はこんな感じです。

load.py

import csv
import json

with open("json.csv") as f:
    csv_records = csv.reader(f)
    # Skip header
    next(csv_records, None)
    for csv_record in csv_records:
        json_record = json.loads(csv_record[0])
        # Output: kanga
        print(json_record['name'])
        # Output: {'qiita': 'kanga', 'twitter': 'kanga333'}
        print(json_record['account'])

AWS APIのget_query_resultsから利用する際も同じ感じです。

get_query_results.py
import json
import boto3

athena = boto3.client('athena')
response = athena.get_query_results(
    QueryExecutionId='15299086-c4e6-425a-9b61-d360a875225d'
)
rows = response['ResultSet']['Rows']

for row in rows[1:]:
    data = row['Data'][0]['VarCharValue']
    json_record = json.loads(data)
    # Output: kanga
    print(json_record['name'])
    # Output: kanga
    print(json_record['account'])

おわりに

以上、Athenaの結果をJSONで受け取る方法でした。とりあえず現状、データをサマって抽出した上で何らかプログラムの入力にする、とかそういったケースで使えなくもないかな?と思います。
一方で、この方法で出力されるファイルは無圧縮のJSONになるので、大量のレコードを出力したい場合には使わない方が良いでしょう。そういったケースでは素直にGlueを使うかEMRでクラスタを立てましょう。

Athenaが正式にJSON形式での結果出力をサポートする日を待ってます、あるいはINSERT文対応。あとは結果を圧縮して格納するオプションも出ると嬉しいなぁ。。。