RDSに蓄積しているアクセスログを今すぐDataLakeに移行する


DataLakeとは

https://aws.amazon.com/jp/big-data/datalakes-and-analytics/what-is-a-data-lake/
AWS公式の記述を引用すると

データレイクは、規模にかかわらず、すべての構造化データと非構造化データを保存できる一元化されたリポジトリです。データをそのままの形で保存できるため、データを構造化しておく必要がありません。また、ダッシュボードや可視化、ビッグデータ処理、リアルタイム分析、機械学習など、さまざまなタイプの分析を実行し、的確な意思決定に役立てることができます。

 当記事ではDataLakeのデータストアをS3とし、既存のRDSに蓄積されている既存ログ(レガシー)をGlueを使ってS3に移行する手順を紹介します。
 なお、S3に移行したログの可視化や活用方法は当記事の対象範囲外です。
 (ただしAthenaを使ってログを抽出できることは確認します)

 また、今後蓄積されるログはRDSではなくS3に蓄積し続けることになりますが、当記事では既存ログの移行方法を紹介するまでに留めます。
 (やるとすればKinesisFirehoseを使うことになるでしょう)

構成図

RDSはVPC内のプライベートサブネットに配置されています。

  • クローリング
    GlueからVPC内のRDSへ接続しクローリングを行ないデータカタログを作ります。

  • ジョブ
    S3へ物理的にログを移行します

  • クエリ発行
    S3へ移行されたログを抽出してみます

GlueからRDSへ接続する

Glue設定

まずはGlueの接続より接続の追加を押下します。

レガシーログが格納されているRDSを選択し、画面通りに設定を進めて完了します。

セキュリティグループ作成

RDSをVPCに配置している場合はセキュリティグループが必要です。
以下を参考に、Glueの自己参照型セキュリティグループを作成します。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/setup-vpc-for-glue-access.html

全てのトラフィックを、 自身のセキュリティグループからのみ呼ばれるように 設定します。

このセキュリティグループを、RDSのセキュリティグループに全てのTCPを開放して設定します。(以下はRDSにアタッチされているセキュリティグループの例)

接続テスト

ここまで進めればGlueの接続より接続テストをしてみましょう。
ここで失敗すれば設定等に不備がある可能性があります。見直しましょう。

クローリング①

データベース作成

Glueからデータベースの作成を行ないます。

クローリング実行

続いてクローラを作成し、実行します。

データストアはJDBCを選択し、先ほど作成した接続を選択します。
実行はオンデマンドで実行とし、最後に手動実行します。

実行後、左メニューのテーブルにテーブルが追加されていることを確認しましょう。

S3バケット作成

Glueのテーブルにクローリングの結果が格納されていますが、物理的なログの格納場所はS3となります。
この次の手順でジョブを作成しログを流し込みますが、そのためのS3バケットを事前に作成します。
適当なバケット名を指定して作成しておきましょう。

Glueジョブ作成

続いてS3へ物理的にログを移行するジョブを作成します。

ジョブにはスクリプトを作成する必要があります。
今回はAWS Glue が生成する提案されたスクリプトを選択して進めます。

なお、IAMロールを作成する必要があるので存在しない場合は新規で作成します。
アタッチするポリシーはAWSGlueServiceRoleAmazonS3FullAccessにします。

その他オプションについては自由ですが、モニタリングオプションは有効化しておくのがおすすめです。
エラーになった場合に原因が特定されやすくなります。

そして設定を進めていくとスクリプトの編集画面に進みます。


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

〜(省略)〜

## @type: DataSink
## @args: [database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]

# datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")
job.commit()

最終行付近にあるglueContext.write_dynamic_frame.from_catalogの部分を編集します。
今回はS3に配置するため、以下のようにします。

glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")

s3://xxxがバケット、yyy/zzzはキー(プレフィックス)になりますが、S3ではキーで区切ることでデータストアのパーティションとして定義することができます。パーティションについての解説は当記事の対象範囲外とします。

クローリング②

ジョブの実行が完了すると、S3に.parquet形式のログデータが格納されていると思います。
このデータをデータカタログとして登録するためにクローラの作成を行ないます。

前回の手順ではデータストアをRDSとしましたが、今回はS3です。

クローラを実行し、左メニューのテーブルにS3をデータストアとしたテーブルが追加されていることを確認しましょう。

なお、クローラを実行しないとデータカタログが更新されませんので
商用で活用する場合は定期的に実行するようにスケジュールを設定することをおすすめします。

Athenaによるクエリ実行

Athenaによるログの抽出は簡単です。SQLとほぼ同じ形式で書けます。
クローリングしたデータが抽出できることを確認しましょう。

※下記はAWSがあらかじめ用意したサンプルで例を提示しています。