AthenaのMSCK REPAIR TABLEを試してみた


やりたいこと

  • 最初に、日本郵政が公開している郵便番号データをAthenaテーブルとしてhive互換パーティションを設定して登録する
  • 次に追加のパーティションにデータを出力して、MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録して追加のパーティションキーでデータが参照できることを確認する

郵便番号データをS3にアップロードする

  • 日本郵政が公開している郵便番号データをS3にアップする
    • s3バケット名は、PySparkコード上ではプレースホルダ{_s3bucket_input}として表記
    • 東京都はtokyo、千葉県はchibaというフォルダを作成し、その下にCSVを配置

郵便番号データをAthenaに登録する

  • 以下のPySparkコードで東京都のデータをParquet形式で出力し、Glueカタログにテーブルを登録する
    • EMRを起動してspark-submitで実行
    • 都道府県名カラムをパーティションキーとした
    • 出力先のバケット名は、プレースホルダ{_s3bucket_output}として表記
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField('jis_code', StringType(), True, {'comment': "全国地方公共団体コード"}),
    StructField('postal5', StringType(), True, {'comment': "(旧)郵便番号(5桁)"}),
    StructField('postal7', StringType(), True, {'comment': "郵便番号(7桁)"}),
    StructField('pref_kana', StringType(), True, {'comment': "都道府県名カナ"}),
    StructField('city_kana', StringType(), True, {'comment': "市区町村名カナ"}),
    StructField('address_kana', StringType(), True, {'comment': "町域名カナ"}),
    StructField('pref', StringType(), True, {'comment': "都道府県名"}),
    StructField('city', StringType(), True, {'comment': "市区町村"}),
    StructField('address', StringType(), True, {'comment': "町域名"}),
    StructField('flag1', StringType(), True, {'comment': "一町域が二以上の郵便番号で表される場合の表示(「1」は該当、「0」は該当せず)"}),
    StructField('flag2', StringType(), True, {'comment': "小字毎に番地が起番されている町域の表示(「1」は該当、「0」は該当せず)"}),
    StructField('flag3', StringType(), True, {'comment': "丁目を有する町域の場合の表示 (「1」は該当、「0」は該当せず)"}),
    StructField('flag4', StringType(), True, {'comment': "一つの郵便番号で二以上の町域を表す場合の表示(「1」は該当、「0」は該当せず)"}),
    StructField('flag5', StringType(), True, {'comment': "更新の表示(「0」は変更なし、「1」は変更あり、「2」廃止(廃止データのみ使用))"}),
    StructField('flag6', StringType(), True, {'comment': "変更理由 (「0」は変更なし、「1」市政・区政・町政・分区・政令指定都市施行、「2」住居表示の実施、「3」区画整理、「4」郵便区調整等、「5」訂正、「6」廃止(廃止データのみ使用))"})
])

def main():
    spark = SparkSession.builder \
        .config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
        .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
        .enableHiveSupport() \
        .getOrCreate()
    # s3から東京都のcsvデータを入力
    df = spark.read \
        .format('csv') \
        .option('header','false') \
        .option('encoding','Windows-31J') \
        .schema(schema) \
        .load('{_s3bucket_input}/tokyo/')
    # s3にparquet形式のデータを出力してGlueカタログにテーブルメタデータを登録
    spark.catalog.setCurrentDatabase('test')
    df.write.option('path','{_s3bucket_output}/zipcode') \
        .option('compression','snappy') \
        .mode('overwrite') \
        .partitionBy('pref') \
        .format('parquet') \
        .saveAsTable('zipcode')

if __name__ == '__main__':
    main()
  • s3には東京都のパーティションデータが出力されている

    • {s3_bucket_output}/zipcode/pref=東京都/
  • Glueカタログに以下のスキーマ情報が登録される

  • パーティション情報は東京都のみ

追加のhive互換パーティションを出力する

  • 今度は千葉県のcsvデータを入力してhive互換パーティションデータを出力する
    • main関数を以下に変更して実行
    • 'spark.sql.sources.partitionOverwriteMode'を指定することで、東京都のパーティションデータは保持され、追加の千葉県のデータがパーティションデータとして追加される
    • {s3_bucket_output}/zipcode/pref=千葉県/
def main():
    spark = SparkSession.builder \
        .config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
        .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
        .enableHiveSupport() \
        .getOrCreate()
    df = spark.read \
        .format('csv') \
        .option('header','false') \
        .option('encoding','Windows-31J') \
        .schema(schema) \
        .load('{_s3bucket_input}/chiba/')
    df.write.parquet('{_s3bucket_output}/zipcode', mode='overwrite', partitionBy='pref', compression='snappy')
  • この時点では、partition情報がGlueカタログに登録されていないので、千葉県のデータを参照することはできない

追加のパーティション情報をGlueに登録する

  • MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録する
    • main関数を以下に変更して実行
def main():
    spark = SparkSession.builder \
        .config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sql('MSCK REPAIR TABLE test.zipcode')
  • 追加のパーティション(千葉県)がGlueに登録される

  • Athenaで追加のパーティション情報を参照できるようになる

さいごに

  • このようなETLジョブの要件として、1つ以上のパーティションを含む増分で登録するという要件が多いかと思います。今回の例ではこのようなケースで対応できる実装例を示してみました。