DynamoDBのデータをGlue, Athena, Lambda, QuickSight を使って分析の定期実行


はじめに

アプリ開発では、使用状況を可視化したいという要求がよくあり、
事前に運用を想定して分析しやすいようなデータベースへのデータ格納にすることができていればよいのですが、最初はスピード優先やミニマムに開発してといったプロジェクトの場合、
JSONデータをスキーマレスで格納してしまうといったことがあるかと思います。

その場合、通常はDBからデータを取得して、
ログイン回数やUU数、アプリの各種イベントの状況、ゲームの達成状況など、
分析用に加工処理して出力する部分のプログラミング開発が必要かと思います。

Glueを使えば、DBからのデータ抽出部分のプログラミングが不要になり、
Athenaを使えば、分析用の加工をSQLで行え、
QuickSightを使えば、表示用のWebフロント側をプログラミングする必要がない、
ということで、やってみました。


DynamoDBのデータを分析できるようにするには

AWS Glueを使用してDynamoDBテーブルからデータを抽出し、Amazon AthenaでSQLクエリを使用して分析します。

  1. AWS Glue クローラを設定
    • DynamoDBからスキーマを検出し、AWS Glue Data Catalogにメタデータを設定
  2. AWS Glue ETLジョブを設定
    • DynamoDBテーブルからデータを抽出し、S3に格納
    • AWS Glue Data Catalogによって提供される、定義済みのスキーマに結果を格納
  3. Athenaの設定
    • Amazon S3で抽出されたデータの外部スキーマストアとしてAWS Glue Data Catalogを使用
    • SQLを使用してAmazon S3を直接クエリするために、Athenaを利用することで、データ分析を実行

Athenaで分析するには、上記だけでよいのですが、開発者以外の関係者がKPIを見たいといった場合、
4. QuickSightの設定
5. 分析用データを定期的に更新
といったことが必要になると思います。

参考記事
https://aws.amazon.com/jp/blogs/news/simplify-amazon-dynamodb-data-extraction-and-analysis-by-using-aws-glue-and-amazon-athena/


AWS Glueでデータ抽出

AWS Glueクローラを作成

DynamoDBからデータを抽出し、データカタログを作成します。

  • [AWS Glueコンソール]を開きます。
  • ナビゲーションパネルで[クローラ]を選択し、[クローラの追加]を選択します。
  • クローラ名(dynamodb-crawler)を入力し、[次へ]を選択します。
  • [データストアの選択] リストで、[DynamoDB] を選択します。
  • [テーブル名]ボックスの横にある小さなフォルダアイコンを選択し、DynamoDBが作成したテーブルを選択します。[選択] をクリックしてメニューを閉じ、[次へ] を選択します。
    • [別のデータストアを追加]ページで、デフォルト設定を[いいえ]のままにして、[次へ]を選択します。
    • Table name UserActivityLog
    • 最後に、[別のデータストアを追加]ページで、デフォルト設定を[いいえ]のままにして、[次へ]を選択します。
  • [IAMロールを選択]ページで、[新規のIAMロールを選択]をクリックし、該当IAMロールを追加し、完了したら、[次へ] を選択します。
  • [このクローラのスケジュールを作成する]ページで、既定の設定を[オンデマンドで実行]とし、[次へ]をクリックします。(定期的に実行する場合は、スケジュールの実行をcron設定します。)
  • クローラの出力を設定する必要があります。データベース作成で、glue_outputにし、[次へ] を選択します。(注意: アンダースコアを使うこと。)

これで、クローラがDynamoDBテーブルに対して実行され、
データカタログにテーブルのためのエントリーが作成されたので、
DynamoDBテーブルをソースとしてETLジョブを作成して実行することができます。

ETLジョブ作成と実行

以下の手順では、DynamoDBからデータを抽出し、
Amazon S3バケットに格納するAWS Glue ETLジョブを作成します。

ジョブの追加

  • ジョブ名: dynamodb-etl-job-useractivitylog
    • プロパティはほぼデフォルト
    • ジョブのブックマークを有効化
  • sourceをDyanamoDBのテーブル
  • targetでデータテーブルを作成
    • データストアで、S3
    • useractivitylog
      • s3://glue-etl-output/useractivitylog
    • 形式で、JSON

Crawlerで抽出したデータカタログを使用する場合は、
ターゲットで、「データカタログのテーブルを使用し、データターゲットを更新する」を選びます。

ジョブの実行を行うと、指定したS3のパスにファイルができます。


Athenaで分析する

Glueの設定で、DynamoDBからデータが抽出され、Amazon S3に格納されるはずです。
SQLを使用してAthenaでそのデータを分析することができます。

  • Athenaコンソールに移動します。[データベース]ドロップダウンリストで、glue_outputを選択します。 テーブルを作成する
CREATE EXTERNAL TABLE `useractivitylog`(
  `datetime` string,
  `role` string,
  `activity` string,
  `value` bigint,
  `userid` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://glue-etl-output/useractivitylog'
TBLPROPERTIES (
  'has_encrypted_data'='false')

参考: https://aws.amazon.com/jp/blogs/news/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

SELECT文の例

WITH useractivity_daygroup AS (
 SELECT
  DISTINCT userid,
  activity,
  value,
  date_trunc('day', date_parse(datetime, '%Y-%m-%dT%H:%i:%s')) AS daygroup
  FROM "glue_output"."useractivitylog"
  WHERE activity = 'login'
)
SELECT
  count(userid) AS UU,
  daygroup
FROM
  useractivity_daygroup
GROUP BY
  daygroup
;

クエリ結果をS3に保存

  • AthenaでSettingをクリックし、Query result locationにS3のバケット名「aws-athena-query-results--」を指定
  • 先程のSELECTのクエリを保存しておきましょう。保存し実行した出力結果が、バケットに保存されます。

上記までで、日別のログイン回数をプログラムレスで分析できるようになりました。

JSONデータを分析する場合は、ネストした構造の場合はデシリアライズしたりする必要があるので、
こちらが参考になると思います。
Hive JSON SerDe
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/json.html#hivejson


では、Athenaで分析した結果を可視化してみたいと思います。

QuickSight

QuickSightドキュメント
https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/welcome.html

QuickSightのdata sourceに、Athenaのクエリ保存したS3バケットを指定し、
適当に名前をつけて、上記バケットを指定した、manifestファイルをアップロードします。

参考: manifestファイル
https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/supported-manifest-file-format.html

例:

{
    "fileLocations": [
        {
            "URIPrefixes": [
                "s3://aws-athena-query-results-<id>-<region>/userActivityLogin/"
            ]
        }
    ],
    "globalUploadSettings": {
        "format": "CSV",
        "delimiter": ",",
        "textqualifier": "'",
        "containsHeader": "true"
    }
}

データを読み込めれば、視覚化でフィールドリストを指定すると、こんな感じのグラフになります。
グラフのメニューからCSVエクスポートも可能です。


Athenaのクエリを定期実行

さて、これで、分析、視覚化というのはできましたが、
Athenaの分析を手動で行わないといけないことが問題です。

AWS CLIでクエリ実行してみる

クエリの自動実行はLambdaで行えばよいのですが、
その前に、クエリ実行時の挙動を詳しくみてみようと思います。

AWS CLIを使用して、Athenaのquery定期実行の仕様を考える

まず、AWS CLIのconfigを設定します。

$ export AWS_DEFAULT_PROFILE=my_aws_profile

AWS Consoleで保存したクエリのIDを出力する

$ aws athena list-named-queries

aws athena list-named-queries
{
    "NamedQueryIds": [
        "123456786-414d-44a9-82cc-8e4f9781c8a0"
    ]
}

QueryIDを指定してクエリの詳細を見る

$ aws athena get-named-query --named-query-id "123456786-414d-44a9-82cc-8e4f9781c8a0"
{
    "NamedQuery": {
      (略)
    }
}

ここで指定されているQueryを指定して、クエリ実行してみる。
注意:データベース名に_(アンダースコア)以外の-(ハイフン)が入っているとエラーになる。

$ aws athena start-query-execution  \
  --query-string "(略)" \
  --result-configuration OutputLocation=s3://aws-athena-query-results-<id>-<region>/login_uu_day-prod/
 {
     "QueryExecutionId": "67b1abba-3acc-4148-b541-420dfa46dd58"
 }

結果確認

$ aws athena get-query-results  --query-execution-id "67b1abba-3acc-4148-b541-420dfa46dd58"

上記を実行するたびに、S3にデータが増えていきます。
QuickSightで見た時に、どうなるか?ですが、
データが追加されてしまい、データが重複されて読み込まれてしまうので、
古いデータは削除する必要があります。

ですので、Lambdaでは、以下の処理が必要となります。

  • 該当バケットのパス配下のファイル削除
  • athenaのクエリ実行
  • クエリ結果の確認

Athenaクエリを定期実行するLambda

Lambdaを実行するIAM Roleに、便宜上下記のアクセス権限を与えます。
- CloudWatchLogsFullAccess
- S3FullAccess
- AmazonAthenaFullAccess
- AWSGlueServiceRole

  • Lambdaの関数名: athena-loguin_uu_day
  • 実行環境: Python3.8
  • Lambdaの実行時間を3分
  • トリガーに、CloudWatch Eventsを指定
    • 名前: athena-lambda-trigger
    • スケジュール式: rate(1 day)

Lambdaから関数エクスポートしたファイルを編集してCloudFormation化する
各ファイルをパッケージ化し、テンプレートとしてs3にアップロード

例: AthenaLoginUuDay.py

import os
import time
import boto3

RETRY_COUNT = 300
DATABASE = 'glue-output' 
TABLE = 'useractivity'
PATH = 'login_uu_day/'
S3_BUCKET = 'aws-athena-query-results-<id>-<region>'
S3_OUTPUT = 's3://' + S3_BUCKET + '/' + PATH

QUERY = ("""
略(上記のクエリ)
""")

def lambda_handler(event, context):
    query = QUERY
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,
        }
    )
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)
    for i in range(1, 1 + RETRY_COUNT):
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']
        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break
        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)
        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')
    QUERY_FILE_FROM = PATH + query_execution_id + ".csv"
    QUERY_FILE_TO = PATH + "out.csv"

    # rename csv file
    s3 = boto3.client('s3')
    s3.copy_object(Bucket=S3_BUCKET, Key=QUERY_FILE_TO, CopySource={'Bucket': S3_BUCKET, 'Key': QUERY_FILE_FROM})

    # created s3 object
    s3_objects_key = []
    s3_object_key_csv = QUERY_FILE_FROM
    s3_objects_key.append({'Key': s3_object_key_csv})
    s3_object_key_metadata = QUERY_FILE_FROM + '.metadata'
    s3_objects_key.append({'Key': s3_object_key_metadata})
    # delete s3 object
    for i in range(1, 1 + RETRY_COUNT):
        response = s3.delete_objects(
            Bucket=S3_BUCKET,
            Delete={
                'Objects': s3_objects_key
            }
        )
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            print("delete %s complete" % s3_objects_key)
            break
        else:
            print(response['ResponseMetadata']['HTTPStatusCode'])
            time.sleep(i)
    else:
        raise Exception('object %s delete failed' % s3_objects_key)

後は、複数のクエリがある場合は、CloudFormationを使って、上記テンプレにしてクエリ分のFunctionを作成しておけばOK。

$ aws cloudformation package --template-file template.yaml --s3-bucket bucketname --output-template-file output-template.yaml

Successfully packaged artifacts and wrote output template to file output-template.yaml.
Execute the following command to deploy the packaged template
aws cloudformation deploy --template-file /<path-to-file>/output-template.yaml --stack-name <YOUR STACK NAME>

QuickSightで更新スケジュール設定

上記で設定した定期実行のLambadaにより、Athenaクエリ結果のデータが更新されるので、
QuickSightでもデータセットの更新スケジュールを設定します。

データの管理 > データセットを選択 > 更新スケジュール > 毎日

以上