Cloudtrailのログを、awsマネージドではないsparkのmetastoreに登録する方法


はじめに

awsマネージドなSpark/hive環境であれば、 com.amazon.emr.hive.serde.CloudTrailSerde あたりを利用してmetastoreにテーブルを登録することができます。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/cloudtrail-logs.html
しかし、 com.amazon.emr.hive.serde.CloudTrailSerde はawsマネージドではないSpark/hiveでは利用できません。

本記事では、databricksなどの awsマネージドではないSpark環境 において、cloudtrailのログをmetastoreに登録する方法を説明します。

注意

  • region/year/month/dayのpartitionが利用可能なようにcreate tableしています。
    • partitionは手動で追加する必要があります。手順参照。
  • hive上では、本手順では動きません。
    • とはいえ、ちょっとした変更で動くと思う

手順

tableの定義

以下のqueryで CREATE TABLE をします。 {} 内は適宜書き換えてください。
{table_path} は、一般的に s3a://{CloudTrail_bucket_name}/AWSLogs/{Account_ID}/CloudTrail/ の形式になります。

列名/objectのkey名は大文字小文字を適切に指定しないと読み込まないので注意( 本日の落とし穴 )。

sparksql
CREATE TABLE {db}.{table}
(
  Records ARRAY<
    STRUCT<
      additionalEventData:MAP<STRING,STRING>, -- valueがmapでもintでもstringとして読んでくれるので、後で使う側に適当に処理してもらう
      awsRegion:STRING,
      errorCode:STRING,
      errorMessage:STRING,
      eventID:STRING,
      eventName:STRING,
      eventSource:STRING,
      eventTime:STRING,
      eventType:STRING,
      eventVersion:STRING,
      readOnly:STRING,
      recipientAccountId:STRING,
      requestID:STRING,
      requestParameters:MAP<STRING,STRING>,
      resources:ARRAY<MAP<STRING,STRING>>,
      responseElements:MAP<STRING,STRING>,
      serviceEventDetails:STRING,
      sharedEventID:STRING,
      sourceIPAddress:STRING,
      userAgent:STRING,
      userIdentity:MAP<STRING,STRING>,
      vpcEndpointId:STRING
    >
  >,
  region STRING,
  year STRING,
  month STRING,
  day STRING
)
USING json
PARTITIONED BY (region, year, month, day)
LOCATION '{table_path}'
;

viewの定義

logを直接覗くとわかるのですが、Cloudtrailのlogは以下のようなjsonになっています:

{"Records":[
  {"eventVersion":"1.05",...},
  {"eventVersion":"1.05",...},
]}

これは、Records内にすべての行と列が内包されており、扱いが面倒です。

そこで、この問題を解決するviewを予め定義しましょう。
以下のqueryで CREATE VIEW をします。

sparksql
CREATE OR REPLACE VIEW {db}.{view}
AS
  SELECT 
    Record.*,
    region,
    year,
    month,
    day
  FROM
    (
      SELECT
        explode(Records) AS Record,
        region,
        year,
        month,
        day
      FROM
        {db}.{table}
    )
;

partitionの追加

ディレクトリ(prefix)が {partition_key}={partiton_val} の形式ではないため、 MSCK REPAIR TABLE を利用できません。
そのため、ALTER TABLE ADD PARTITION を使用します。
{partition_path} は、一般的に s3a://{CloudTrail_bucket_name}/AWSLogs/{Account_ID}/CloudTrail/{region}/{year}/{month}/{day}/ の形式になります。

ALTER TABLE {db}.{table} ADD IF NOT EXISTS
  PARTITION (region='{region}', year='{year}', month='{month}', day='{day}')
  LOCATION '{partition_path}';