RDSに蓄積しているアクセスログを今すぐDataLakeに移行する
DataLakeとは
https://aws.amazon.com/jp/big-data/datalakes-and-analytics/what-is-a-data-lake/
AWS公式の記述を引用すると
データレイクは、規模にかかわらず、すべての構造化データと非構造化データを保存できる一元化されたリポジトリです。データをそのままの形で保存できるため、データを構造化しておく必要がありません。また、ダッシュボードや可視化、ビッグデータ処理、リアルタイム分析、機械学習など、さまざまなタイプの分析を実行し、的確な意思決定に役立てることができます。
当記事ではDataLakeのデータストアをS3とし、既存のRDSに蓄積されている既存ログ(レガシー)をGlueを使ってS3に移行する手順を紹介します。
なお、S3に移行したログの可視化や活用方法は当記事の対象範囲外です。
(ただしAthenaを使ってログを抽出できることは確認します)
また、今後蓄積されるログはRDSではなくS3に蓄積し続けることになりますが、当記事では既存ログの移行方法を紹介するまでに留めます。
(やるとすればKinesisFirehoseを使うことになるでしょう)
構成図
RDSはVPC内のプライベートサブネットに配置されています。
クローリング
GlueからVPC内のRDSへ接続しクローリングを行ないデータカタログを作ります。ジョブ
S3へ物理的にログを移行しますクエリ発行
S3へ移行されたログを抽出してみます
GlueからRDSへ接続する
Glue設定
レガシーログが格納されているRDSを選択し、画面通りに設定を進めて完了します。
セキュリティグループ作成
RDSをVPCに配置している場合はセキュリティグループが必要です。
以下を参考に、Glueの自己参照型セキュリティグループを作成します。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/setup-vpc-for-glue-access.html
全てのトラフィックを、 自身のセキュリティグループからのみ呼ばれるように 設定します。
このセキュリティグループを、RDSのセキュリティグループに全てのTCPを開放して設定します。(以下はRDSにアタッチされているセキュリティグループの例)
接続テスト
ここまで進めればGlueの接続
より接続テスト
をしてみましょう。
ここで失敗すれば設定等に不備がある可能性があります。見直しましょう。
クローリング①
データベース作成
クローリング実行
データストアはJDBC
を選択し、先ほど作成した接続を選択します。
実行はオンデマンドで実行
とし、最後に手動実行します。
実行後、左メニューのテーブル
にテーブルが追加されていることを確認しましょう。
S3バケット作成
Glueのテーブル
にクローリングの結果が格納されていますが、物理的なログの格納場所はS3となります。
この次の手順でジョブを作成しログを流し込みますが、そのためのS3バケットを事前に作成します。
適当なバケット名を指定して作成しておきましょう。
Glueジョブ作成
ジョブにはスクリプトを作成する必要があります。
今回はAWS Glue が生成する提案されたスクリプト
を選択して進めます。
なお、IAMロールを作成する必要があるので存在しない場合は新規で作成します。
アタッチするポリシーはAWSGlueServiceRole
とAmazonS3FullAccess
にします。
その他オプションについては自由ですが、モニタリングオプションは有効化しておくのがおすすめです。
エラーになった場合に原因が特定されやすくなります。
そして設定を進めていくとスクリプトの編集画面に進みます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
〜(省略)〜
## @type: DataSink
## @args: [database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
# datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")
job.commit()
最終行付近にあるglueContext.write_dynamic_frame.from_catalog
の部分を編集します。
今回はS3に配置するため、以下のようにします。
glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")
s3://xxx
がバケット、yyy/zzz
はキー(プレフィックス)になりますが、S3ではキーで区切ることでデータストアのパーティションとして定義することができます。パーティションについての解説は当記事の対象範囲外とします。
クローリング②
ジョブの実行が完了すると、S3に.parquet
形式のログデータが格納されていると思います。
このデータをデータカタログとして登録するためにクローラの作成を行ないます。
前回の手順ではデータストアをRDSとしましたが、今回はS3です。
クローラを実行し、左メニューのテーブル
にS3をデータストアとしたテーブルが追加されていることを確認しましょう。
なお、クローラを実行しないとデータカタログが更新されませんので
商用で活用する場合は定期的に実行するようにスケジュールを設定することをおすすめします。
Athenaによるクエリ実行
Athenaによるログの抽出は簡単です。SQLとほぼ同じ形式で書けます。
クローリングしたデータが抽出できることを確認しましょう。
Author And Source
この問題について(RDSに蓄積しているアクセスログを今すぐDataLakeに移行する), 我々は、より多くの情報をここで見つけました https://qiita.com/flatnyat/items/f0894f93921aadd3d15c著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .