CloudFrontのアクセスログをパーティション射影を使ってコスパ良くAthenaでクエリーする仕組みをCDKで作った


はじめに

おはようございます、加藤です。
CloudFrontのアクセスログはS3バケットに保存されるためログ分析を行い対場合はAthenaを使ってクエリーを行うのが一般的です。
しかし、Athenaはクエリー対象のデータ容量による従量課金であるためアクセスログという時間が経つほど増えるデータに対してだとクエリーにかかる料金が時間軸で増加します。
解決策としてデータのパーティション化が従来案内されていました。
2020年6月にはパーティション射影(Partition Projection)という新しい方法が提供開始されました。
両者の比較は下記のブログが詳しいです。

[新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | DevelopersIO

今回はCloudFrontのアクセスログに対して後者のパーティション射影を使ってクエリーを行う仕組みをCDKを使って構築したのでそれを公開します。リポジトリはこちらで公開しています。

CDKコンストラクトの作成

今回の目的を満たすCDKコンストラクトを作成します。コメントとしてコードの説明を記載しています。

lib/athena-table-for-cloudfront/index.ts
import {Stack} from 'aws-cdk-lib';
import {Rule} from 'aws-cdk-lib/aws-events';
import {LambdaFunction} from 'aws-cdk-lib/aws-events-targets';
import {CfnDatabase, CfnTable} from 'aws-cdk-lib/aws-glue';
import {NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs';
import {CfnBucket, IBucket} from 'aws-cdk-lib/aws-s3';
import {Construct} from 'constructs';
import {resolve} from 'path';

export interface AthenaTableForCloudFrontProps {
  /**
   * @default cloudfront
   */
  databaseName?: string;
  /**
   * @default access_logs
   */
  tableName?: string;
  /**
   * オリジナルのCloudFrontアクセスログが出力されるバケット
   */
  srcBucket: IBucket;
  /**
   * パーティション射影可能なオブジェクト名に変更後に出力されるバケット
   * srcBucketと同じバケットは無限ループが発生するため使用不可
   */
  destBucket: IBucket;
  /**
   * @default cloudfront/
   */
  destPrifix?: string;
}

export class AthenaTableForCloudFront extends Construct {
  public readonly cfnDatabase: CfnDatabase;
  public readonly cfnTable: CfnTable;
  public readonly handler: NodejsFunction;
  public readonly rule: Rule;

  constructor(
    scope: Construct,
    id: string,
    props: AthenaTableForCloudFrontProps
  ) {
    super(scope, id);

    /**
     * デフォルトパラメータを設定します。
     */
    const {
      databaseName = 'cloudfront',
      tableName = 'access_logs',
      srcBucket,
      destBucket,
      destPrifix = 'cloudfront/',
    } = props;

    /**
     * srcとdestバケットが同一の場合は無限ループになるためエラーを吐きます。
     * フィルター条件などを設定することで同一バケットを使うこともできますが、そこを頑張るよりもバケットをわけた方が楽かつ確実なのでここではやらないようにします。
     */
    if (srcBucket.bucketArn === destBucket.bucketArn) {
      throw new Error(
        'srcBucketとdestBucketに同じバケットを指定するとLambdaの無限実行が発生します。'
      );
    }

    /**
     * バケットにオブジェクトが配置された時のLambda発火にEventBridgeを使用します。バケットでEventBridgeを使える様にフラグを有効にします。
     */
    const cfnSrcBucket = srcBucket.node.defaultChild as CfnBucket;
    cfnSrcBucket.addPropertyOverride(
      'NotificationConfiguration.EventBridgeConfiguration.EventBridgeEnabled',
      true
    );

    /**
     * Glueデータベースを定義します。
     */
    const cfnDatabase = new CfnDatabase(this, 'Database', {
      catalogId: Stack.of(this).account,
      databaseInput: {name: databaseName},
    });
    this.cfnDatabase = cfnDatabase;

    /**
     * Glueテーブル(Athenaテーブル)を定義します。
     */
    const cfnTable = new CfnTable(this, 'Table', {
      catalogId: Stack.of(this).account,
      databaseName: cfnDatabase.ref,
      tableInput: {
        name: tableName,
        tableType: 'EXTERNAL_TABLE',
        // dateというキー名でクエリー範囲を絞り込めるように構築します。
        partitionKeys: [{name: 'date', type: 'string'}],
        parameters: {
          'skip.header.line.count': '2',
          'projection.enabled': true, // パーティション射影の有効化
          // projection.[key].type: keyはpartitionKeysのnameと一致させる
          'projection.date.type': 'date', // 型の指定
          'projection.date.range': '2021/01/01/00,NOW', // 範囲の指定
          'projection.date.format': 'yyyy/MM/dd/HH', // フォーマットの指定
          'projection.date.interval': 1,
          'projection.date.interval.unit': 'HOURS', // 最小単位が時間なのでHOURS
          'storage.location.template':
            `s3://${destBucket.bucketName}/${destPrifix}` + '${date}', // 末尾partitionKeysのnameと一致させる
        },
        /**
         * アクセスログのフォーマットに合わせてカラムを定義する。1つ目のカラム名がdateでpartitionKeysと重複しているのでlog_dateに変更している。
         * https://docs.aws.amazon.com/ja_jp/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#:~:text=%E3%82%B9%E3%83%9A%E3%83%BC%E3%82%B9-,%E6%A8%99%E6%BA%96%E3%83%AD%E3%82%B0%E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB%E3%83%95%E3%82%A3%E3%83%BC%E3%83%AB%E3%83%89,-%E3%83%87%E3%82%A3%E3%82%B9%E3%83%88%E3%83%AA%E3%83%93%E3%83%A5
         */
        storageDescriptor: {
          columns: [
            {name: 'log_date', type: 'date'},
            {name: 'time', type: 'string'},
            {name: 'x_edge_location', type: 'string'},
            {name: 'sc_bytes', type: 'bigint'},
            {name: 'c_ip', type: 'string'},
            {name: 'cs_method', type: 'string'},
            {name: 'cs_host', type: 'string'},
            {name: 'cs_uri_stem', type: 'string'},
            {name: 'sc_status', type: 'int'},
            {name: 'cs_referer', type: 'string'},
            {name: 'cs_user_agent', type: 'string'},
            {name: 'cs_uri_query', type: 'string'},
            {name: 'cs_cookie', type: 'string'},
            {name: 'x_edge_result_type', type: 'string'},
            {name: 'x_edge_request_id', type: 'string'},
            {name: 'x_host_header', type: 'string'},
            {name: 'cs_protocol', type: 'string'},
            {name: 'cs_bytes', type: 'bigint'},
            {name: 'time_taken', type: 'float'},
            {name: 'x_forwarded_for', type: 'string'},
            {name: 'ssl_protocol', type: 'string'},
            {name: 'ssl_cipher', type: 'string'},
            {name: 'x_edge_response_result_type', type: 'string'},
            {name: 'cs_protocol_version', type: 'string'},
            {name: 'fle_status', type: 'string'},
            {name: 'fle_encrypted_fields', type: 'string'},
            {name: 'c_port', type: 'int'},
            {name: 'time_to_first_byte', type: 'float'},
            {name: 'x_edge_detailed_result_type', type: 'string'},
            {name: 'sc_content_type', type: 'string'},
            {name: 'sc_content_len', type: 'bigint'},
            {name: 'sc_range_start', type: 'bigint'},
            {name: 'sc_range_end', type: 'bigint'},
          ],
          inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
          outputFormat:
            'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
          serdeInfo: {
            serializationLibrary:
              'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
            parameters: {
              'field.delim': '\t',
              'serialization.format': '\t',
            },
          },
          location: `s3://${destBucket.bucketName}/${destPrifix}`,
        },
      },
    });
    this.cfnTable = cfnTable;

    /**
     * オリジナルのアクセスログのオブジェクトキーがパーティション射影に対応していないので対応させる。
     */
    const handler = new NodejsFunction(this, 'MoveAccessLog', {
      entry: resolve(__dirname, 'handler/move-access-log.ts'),
      bundling: {
        sourceMap: true,
        minify: true,
      },
      environment: {
        TARGET_BUCKET: destBucket.bucketName,
        TARGET_KEY_PREFIX: destPrifix,
      },
      memorySize: 256,
    });
    this.handler = handler;

    /**
     * バケットへのアクセス権限を与える。
     */
    srcBucket.grantRead(handler);
    srcBucket.grantDelete(handler);
    destBucket.grantWrite(handler);

    /**
     * バケットへオブジェクトが作成された時にLambda関数を実行する。
     */
    const rule = new Rule(this, 'CreatedEvent', {
      eventPattern: {
        source: ['aws.s3'],
        resources: [srcBucket.bucketArn],
        detailType: ['Object Created'],
      },
      targets: [new LambdaFunction(handler)],
    });
    this.rule = rule;
  }
}

オブジェクト作成時に実行されるLambda関数はこちらです。
aws-samples/amazon-cloudfront-access-logs-queries: Analyze your Amazon CloudFront Access Logs at Scale with Amazon Athena.をフォークして作成しています。

lib/athena-table-for-cloudfront/handler/move-access-log.ts
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

// Copyright 2022 Kato Ryo

import {
  S3Client,
  CopyObjectCommand,
  DeleteObjectCommand,
} from '@aws-sdk/client-s3';
import {EventBridgeHandler} from 'aws-lambda';
import {cleanEnv, str} from 'envalid';

const env = cleanEnv(process.env, {
  TARGET_BUCKET: str(),
  // prefix to copy partitioned data to w/o leading but w/ trailing slash
  TARGET_KEY_PREFIX: str(),
});

// regex for filenames by Amazon CloudFront access logs. Groups:
// - 1.	year
// - 2.	month
// - 3.	day
// - 4.	hour
const datePattern = '[^\\d](\\d{4})-(\\d{2})-(\\d{2})-(\\d{2})[^\\d]';
const filenamePattern = '[^/]+$';

const s3 = new S3Client({apiVersion: '2006-03-01'});

export const handler: EventBridgeHandler<
  'Object Created',
  {
    version: string;
    bucket: {
      name: string;
    };
    object: {
      key: string;
      size: number;
      etag: string;
      sequencer: string;
    };
    'request-id': string;
    requester: string; // aws account id
    'source-ip-address': string;
    reason: 'PutObject';
  },
  void
> = async event => {
  const bucket = event.detail.bucket.name;
  const sourceKey = event.detail.object.key;

  const sourceRegex = new RegExp(datePattern, 'g');
  const match = sourceRegex.exec(sourceKey);
  if (match === null) {
    console.log(
      `Object key ${sourceKey} does not look like an access log file, so it will not be moved.`
    );
    return;
  }

  const [, year, month, day, hour] = match;

  const filenameRegex = new RegExp(filenamePattern, 'g');
  const splited = filenameRegex.exec(sourceKey);
  if (splited === null) {
    `Object key ${sourceKey} does not look like an access log file, so it will not be moved.`;
    return;
  }
  const filename = splited[0];

  const targetKey = `${env.TARGET_KEY_PREFIX}${year}/${month}/${day}/${hour}/${filename}`;
  console.log(
    `Copying s3://${bucket}/${sourceKey} to s3://${env.TARGET_BUCKET}/${targetKey}.`
  );

  await s3.send(
    new CopyObjectCommand({
      CopySource: `${bucket}/${sourceKey}`,
      Bucket: env.TARGET_BUCKET,
      Key: targetKey,
    })
  );

  console.log(`Copied. Now deleting ${sourceKey}.`);
  await s3.send(new DeleteObjectCommand({Bucket: bucket, Key: sourceKey}));
  console.log(`Deleted ${sourceKey}.`);
};

このコンストラクトにバケットを与えてスタック定義します。今回はデモなのでバケットのRemovalPolicyをDESTROYにしています。

lib/cdk-athena-for-cf-logs-stack.ts
import {RemovalPolicy, Stack, StackProps} from 'aws-cdk-lib';
import {Construct} from 'constructs';
import {Bucket} from 'aws-cdk-lib/aws-s3';
import {AthenaTableForCloudFront} from './athena-table-for-cloudfront';

export class CdkAthenaForCfLogsStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const srcBucket = new Bucket(this, 'SrcBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
    });

    const destBucket = new Bucket(this, 'DestBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
    });

    new AthenaTableForCloudFront(this, 'AthenaCloudFront', {
      srcBucket,
      destBucket,
    });
  }
}

クエリーの実行

SrcバケットにダミーデータをアップロードするとDestバケットにアクセスログがコピーされます。そしてオブジェクトキーがパーティション射影対応の形式になりました。

2022/03/31より後を対象にクエリーを実行します。スキャンしたデータ量は3.85KBでした、

SELECT * FROM "cloudfront"."access_logs"
WHERE date > "2022/03/31";

さらに条件を絞り込んで2022/03/31 〜 2022/04/02の間のデータを取得します。スキャンしたデータ量は1.90KBで先ほどより減少しているため、パーティション投影が効いていることがわかりました。

SELECT * FROM "cloudfront"."access_logs"
WHERE date > "2022/03/31" AND date < "2022/04/02";

あとがき

CloudFrontのアクセスログ、パーティション射影が可能な形式も指定できるようにして欲しい。。。

参考元