TreasureDataで効果的な中間テーブルの活用例


TreasureDataでは、timeカラムを用いてSQLで取得するデータ期間を指定することで、不要なデータI/Oを防ぎ、効率的に処理を行うことができます。
しかし、全期間のデータを読込む集計を数多く実施する場合、データのI/Oに多くの時間がかかってしまうこと等により、リソースを有効に活用できないという危険性があります。 そのため、全データ読込が必要な集計の共通の処理結果を事前に中間テーブルとして別テーブルに格納しておくことで、リソースを有効に活用することができるようになります。

中間テーブルの利用例

例えば、全ユーザ毎のサイトへの最初のアクセス時間と最後のアクセス時間を求めたい、というケースがあります。
サービスの年数が増えるにつれて、2012年に最後にアクセスしたユーザや2017年に最後にアクセスしたユーザなど様々なユーザがでてきます。単純に上記の要件を求めるためにクエリを書くと、2012年から2017年のデータに対して最初と最後のアクセス時間を求めるクエリを書いてしまうことになります。月に数十億PVあるようなサイトのアクセスログに対して、こんなクエリを数十種類やhourlyで投げられてしまった日にはいくらリソースがあっても辛い気持ちになりますね。

中間テーブルを作成するまでの流れ

  1. 初期データ作成のために、中間テーブル作成用クエリを実施対象テーブルの前日までのデータに対して、初回のみ実施し、別テーブルに結果を書出し
  2. データ更新用のスケジュールクエリの登録し、対象テーブルと中間テーブルを元に定期的に中間テーブルにデータを挿入
  3. 集計用クエリの実施し、中間テーブルから集計を行う

1. 初回集計用クエリを発行するコマンド

first_access_time: user_idが最初にアクセスした時間
last_access_time: user_idが最後にアクセスした時間
sample_db: 作業用DB
access: 元テーブル
snapshot_access: 中間テーブル

TDのHiveクエリを例とします。下記では2017年3月1日時点でのsnapshotを作成しており、その日での全ユーザの最初にアクセスした日と最後にアクセスした日が格納されていることになります。

INSERT INTO TABLE snapshot_access
SELECT * FROM (
SELECT
  TD_TIME_PARSE('2017-03-01','JST') AS time,
  user_id,
  TD_FIRST(time, time) AS first_access_time,
  TD_LAST(time, time) AS last_access_time
  FROM access
  WHERE TD_TIME_RANGE(time, null, '2017-03-01', 'JST')
  GROUP BY user̲id
) all

2. 中間テーブル作成用に毎日1時に実行するスケジュールクエリ

スケジュールクエリとして下記を登録します。(今ならTD Workflow(digdag)などを使ってもいいですね。)
サブクエリの一つ目はaccessテーブルの前日分アクセスがあったユーザの最初のアクセス日と最後のサクセス日の集計を行います。
もう一つのサブクエリがポイントで、作成済みのsnapshot_accessから数日分貯めた最初のアクセス日と最後のアクセス日のログを抽出し、上記とUNION ALLして再度全ユーザの最初と最後のアクセス日を求めます。
これをすると、snapshot_accessの1日分は必ず全ユーザID分のログしかたまらないので、毎度全期間を集計しなくても最初と最後のアクセス日を抽出することができるようになります。

-- aggregate data from (aggregated historical aggregated data) + (aggregated todays data)
INSERT INTO TABLE snapshot_access
SELECT
  TD_SCHEDULED_TIME() AS time,
  user̲id,
  TD_FIRST(first_access_time, first_access_time) AS first_access_time,
  TD_LAST(last_access_time, last_access_time) AS last_access_time
FROM (
  -- aggregates todays data
  SELECT
    user_id,
    TD_FIRST(time, time) AS first_access_time,
    TD_LAST(time, time) AS last_access_time
    FROM access
  WHERE TD_TIME_RANGE(time, TD_TIME_ADD(TD_SCHEDULED_TIME(), '-1d', 'JST'), TD_SCHEDULED_TIME(), 'JST')
  GROUP BY user_id

  UNION ALL

  -- aggregate historically aggregated data
  SELECT
    user_id,
    TD_FIRST(first_access_time, time) AS first_access_time,
    TD_LAST(last_access_time, time) AS last_access_time
  FROM snapshot_access
  WHERE TD_TIME_RANGE(time, TD_TIME_ADD(TD_SCHEDULED_TIME(), '-5d', 'JST'), TD_SCHEDULED_TIME(), 'JST')
  GROUP BY user_id
) historical_data_plus_todays_data
GROUP BY user_id

3. 集計用クエリ例

最後にsnapshot_accessを使う場合には、下記のようなクエリを書きます。
毎日の新規ユニークユーザ数を算出する例:

SELECT
first_access_time AS day,
COUNT(1) AS uu
FROM
(
  SELECT
    user_id,
    TD_TIME_FORMAT( TD_FIRST(first_access_time, first_access_time), 'yyyy‒MM‒dd', 'JST') AS
    first_access_day
  FROM snapshot_access
  WHERE TD_TIME_RANGE(time, TD_TIME_ADD(TD_SCHEDULED_TIME(), '-1d', 'JST'), NULL, 'JST')
  GROUP BY user_id
) first_access_table
WHERE
  first_access_time = TD_TIME_FORMAT(TD_SCHEDULED_TIME(), 'yyyy‒MM‒dd', 'JST')
GROUP BY first_access_time

まとめ

中間テーブルを使うと効率的に処理ができるようになるよ。という話でした。
一方で力技で解決する方法もありますが、ワークフローシステムが便利に使えるようになってきているので、効率的な処理フローでできるところはそうした方が良いですね。