AWSデータラングラーを使用してAWS Cloudtrailデータを変換する


AWS CloudTrailサービスは、IAMユーザ、IAMロール、API、SDKと他のAWSサービスによってとられる行動を捕えます.デフォルトでは、AWS CloudTrailはAWSアカウントで有効になっています.あなたはあなたの選択のAmazon S 3バケットにJSON形式で配信される進行中のイベントを記録するために“トレイル”を作成することができます.
Cloudtrailダッシュボード
プロテクタートレイルトレイル
キャプチャするイベントを選択します
ログを読み込み、書き込み、読み取り専用、ログすべての現在および将来のS 3バケツのデータイベントのみを書き込むために設定することができます

また、ラムダ関数のデータイベントをログ出力するオプションもあります.すべての領域、すべての関数を選択するか、特定のラムダ- ARNまたは領域を指定できます.
このトレイルはS 3バケットにおいて小さく、ほとんどのKBサイズのジッパードJSONファイルを作成します.
S 3のバケツでトレイルログファイル
ファイルを選択し、「選択」タブを使用してファイルの内容を表示できます.
からファイルを選択
以下はS 3バケットへの「puObject」イベントの例です.
{
    "eventVersion": "1.07",
    "userIdentity": {
        "type": "AWSService",
        "invokedBy": "s3.amazonaws.com"
    },
    "eventTime": "2020-09-12T23:53:22Z",
    "eventSource": "s3.amazonaws.com",
    "eventName": "PutObject",
    "awsRegion": "us-east-1",
    "sourceIPAddress": "s3.amazonaws.com",
    "userAgent": "s3.amazonaws.com",
    "requestParameters": {
        "bucketName": "my-data-bucket",
        "Host": "s3.us-east-1.amazonaws.com",
        "key": "mydatabase/mytable/data-content.snappy.parquet"
    },
    "responseElements": null,
    "additionalEventData": {
        "SignatureVersion": "SigV4",
        "CipherSuite": "ECDHE-RSA-AES128-SHA",
        "bytesTransferredIn": 107886,
        "AuthenticationMethod": "AuthHeader",
        "x-amz-id-2": "Dg9gelyiPojDT00UJ+CI7MmmEyUhPRe1EAUtzQSs3kJAZ8JxMe+2IQ4f6wT2Kpd+Czih1Dc2SI8=",
        "bytesTransferredOut": 0
    },
    "requestID": "29C76F4BC75743BF",
    "eventID": "6973f9b1-1a7d-46d4-a48f-f2d91c80b2d3",
    "readOnly": false,
    "resources": [
        {
            "type": "AWS::S3::Object",
            "ARN": "arn:aws:s3:::my-data-bucket/mydatabase/mytable/data-content.snappy.parquet"
        },
        {
            "accountId": "xxxxxxxxxxxx",
            "type": "AWS::S3::Bucket",
            "ARN": "arn:aws:s3:::my-data-bucket"
        }
    ],
    "eventType": "AwsApiCall",
    "managementEvent": false,
    "recipientAccountId": "xxxxxxxxxxxx",
    "sharedEventID": "eb37214b-623b-43e6-876b-7088c7d0e0ee",
    "vpcEndpointId": "vpce-xxxxxxx",
    "eventCategory": "Data"
}

CloudTrail provides a useful feature under Event history to create Athena table over the trail's Amazon S3 bucket which you can use to query the data using standard SQL.

Now, depending on the duration and events captured, CloudTrail would create lots of small files in S3, which can impact execution time, when queried from Athena.

Moving ahead, I will show you how you can use AWS Data Wrangler and Pandas to perform the following:

  1. Query data from Athena into Pandas dataframe using AWS Data Wrangler.
  2. Transform eventtime string datatype to datetime datatype.
  3. Extract and add year, month, and day columns from eventtime to dataframe.
  4. Write dataframe to S3 in Parquet format with hive partition using AWS Data Wrangler.
  5. Along with writing the dataframe, how you can create the table in Glue catalog using AWS Data Wrangler.

For this example, I have setup a Sagemaker Notebook with Lifecycle configuration. Once you have the notebook open, you can use conda_python3 kernel to work using AWS Data Wrangler.

Import the required libraries

import awswrangler as wr
import pandas as pd
pd.set_option('display.width', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', None)
Antデータラングラーを使用してAthenaのSQLを実行するPython関数
def execute_sql(sql, database, ctas=False):
    return wr.athena.read_sql_query(sql, database, ctas_approach=ctas)
SQLクエリの詳細を取得するS 3のイベントに関連する
s3ObjectSql = """
SELECT 
    useridentity.sessioncontext.sessionissuer.username as username,
    useridentity.sessioncontext.sessionissuer.type as type,
    useridentity.principalid as principalid,
    useridentity.invokedby as invokedby,
    eventname as event_name,
    eventtime,
    eventsource as event_source,
    awsregion as aws_region,
    sourceipaddress,
    eventtype as event_type,
    readonly as read_only,
    requestparameters
FROM cloudtrail_logs_cloudtrail_logs_traillogs
WHERE eventname in ('ListObjects', 'PutObject', 'GetObject') and eventtime > '2020-08-23'
"""
SQLを実行し、結果としてパンダデータを取得する
data = execute_sql(sql=s3GObjectSql, database='default')

ユニークなユーザー名を見つける
data['username'].value_counts()
あなたが観察するならば、EventTime列は「ストリング」データ型であるので、どんな日付変換も実行するのは難しいでしょう.ここでは、DateTime DataTypeとDrop EventTimeを使用して新しい列を作成します.
EventTime列の文字列からDateTimeへの変換
data['event_time'] = pd.to_datetime(data['eventtime'], errors='coerce')

data.drop('eventtime', axis=1, inplace=True)
を抽出し、EventHunse時間列から年、月、日の列を追加します.この変更で、あなたはハイブパーティションとしてデータをs 3に書くことができます.
DataFrameに新しいフィールドを展開して追加する
data['year'] = data['event_time'].dt.year
data['month'] = data['event_time'].dt.month
data['day'] = data['event_time'].dt.day
今AWSデータラングラーS 3を使用します.あなたが年、月、日、およびparquet形式で分割されたS 3にデータを書くことができます.また、データベースとテーブルのパラメータを追加することができますAthena/Glueカタログにメタデータを書き込むには.データベースは成功するコマンドでなければならないことに注意してください.
wr.s3.to_parquet(
    df=data,
    path='s3://my-bucket/s3_access_analyzer/cloudtrail/',
    dataset=True,
    partition_cols=['year', 'month', 'day'],
    database='default',  # Athena/Glue database
    table='cloudtrail' # Athena/Glue table
)
結果を表示するには、Athenaを問い合わせることができます

質問は、スキャンされたデータの0 KBで完了するだけで1.74秒かかりました.今なぜ0 KB?さて、私はあなたのためにそれを考えて答えておきます.
結論として、AWSデータラングラーを使用すると、上に示したように、簡単にかつ効率的に抽出、変換、ロード(ETL)タスクを実行することができます.それはよく他のAWSサービスと統合され、積極的に新しい機能と拡張機能を更新されています.