AWS Data Wrangler で ETL をやってみる(RDB のデータを加工してデータマートへ)


はじめに

皆さんはアプリケーションデータを加工して分析用データを提供するためのデータパイプラインをどう構築していますか?
本記事ではその選択肢の一つとして、今イチオシの AWS Data Wrangler を紹介します。

AWS Data Wrangler とは

公式 GitHub での記載は以下で、

Pandas on AWS
Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

Pandas を軸としつつ、AWS のリソースに簡単にアクセスできるようにしたもの、と言えそうです。

利用の具体的な流れは、以下のようになります。

  1. Extract: データソースからデータを取得すると、 Pandas のデータフレームとして格納されており、
  2. Transform: Pandas の恩恵を受けて柔軟に加工することができ、
  3. Load: そのまま他のデータソースに対してアップロードを完了することができる

実際に使ってみると、AWS をデータ基盤として使うにあたって必要となっていたステップを短くコーディング可能にする、まさに痒いところに手が届くプロダクトとなっていました!

流れ

今回は RDB のデータを加工してデータマートを作成する以下のパイプラインの作り方を通して、awswrangler のおすすめポイントを説明していきます!

アプリケーションデータの日次バックアップ

MySQL のデータをバッチで S3 に移動させ、さらにクエリ対象にするコードは以下となります。

import awswrangler as wr

# MySQL → Pandas
con = wr.mysql.connect('MY_GLUE_CONNECTION')
df = wr.mysql.read_sql_query(
    sql="SELECT * FROM myapp.users",
    con=con
)
con.close()

# Partition 用のカラムを追加
df['dt'] = TODAY

# Pandas → S3(Glue Catalog への追加まで)
wr.s3.to_parquet(
    df=df,
    path='s3://S3_BUCKET/S3_PATH',
    dataset=True,
    database='TO_DATABASE',
    table='TABLE_NAME',
    partition_cols=['dt']
)

驚くほどシンプルに書けることがわかります。このコードで実現している内容は以下です。

  1. Extract
    1. MySQL に対して SQL を実行し、結果が返るのを待つ
    2. 返ってきたデータを Pandas の DataFrame の形で格納
  2. Transform
    1. Partition 用に日付カラムを追加
  3. Load
    1. データを parquet 形式に変更して、snappy 方式で圧縮
    2. データを S3 の指定したパス + /dt=TODAY/ に格納
    3. データのカラム情報を Glue Catalog の形式に変換
    4. Partition 情報を付与した形で Glue Catalog にテーブル情報を格納

日次バックアップデータからサマリーデータを作成

次に、上記で取得しておいたデータを加工して、データマートを作ってみましょう。

import awswrangler as wr

# S3 → Athena → Pandas
sql = f'''
SELECT dt,
       count(user_id) as user_num
FROM users
WHERE dt = {TODAY}
GROUP BY dt
ORDER BY dt
'''

df = wr.athena.reqd_sql_query(
    sql=sql,
    database='FROM_DATABASE',
)

# Pandas → S3(Glue Catalog への追加まで)
wr.s3.to_parquet(
    df=df,
    path='s3://S3_BUCKET/S3_PATH',
    dataset=True,
    database='TO_DATABASE',
    table='TABLE_NAME',
    mode='overwrite',
)

今回は Transform 処理を sql に収めたので、よりシンプルになりました。パーティションが存在せず毎日生成し直すデータとなるため、 overwrite オプションをつけて、データソースや Catalog 情報は上書きするように設定しています。

  1. Extract
    1. Athena に対して SQL を実行し、結果が返るのを待つ
    2. 返ってきたデータを Pandas の DataFrame の形で格納
  2. Transform
    1. SQL にて実施
  3. Load
    1. データを parquet 形式に変更して、snappy 方式で圧縮
    2. データを S3 の指定したパス に格納(上書き)
    3. データのカラム情報を Glue Catalog の形式に変換
    4. Glue Catalog にテーブル情報を格納(上書き)

SQL で完結できる ETL 処理であれば、実行自体はたった2行で実現可能であることがわかります。SQL で完結できない場合も、 Pandas の DataFrame に格納されているので、苦労なく加工できるのはイメージしやすいかと思います。

今後期待している機能

一方で、まだ機能が足りておらず、awswrangler を使えていないパイプライン処理はいくつかあり、その中の一つとしては以下があります

CloudWatchLogs のバックアップ

wr.cloudwatch.read_logs が API として提供されていますが、こちらは limit の上限が 10000 となっており、また chunksize や last_access_id を指定した取得は用意されておらず、膨大になりがちなログデータを取得するには現実的ではない仕様となっています。あくまで Insights を確認するためのものとして用意されているようです。今後生データを取得できる API も提供されることを期待しています。

おわりに

まだ機能が足りないと感じる部分もありますが、毎月マイナーアップデートが実施されて機能が追加されているプロダクトなので、AWS の熱量を感じています。
Apache Spark などのパイプラインツールを利用しても同様のことは実現可能ですが、こと AWS においては、awswrangler を利用した場合のコード行数の少なさは圧倒的です。ぜひ一度お試し下さい。

興味持っていただけたら、LGTM やコメントをいただければ幸いです!