Treasure Dataで複数のクエリをパラメータを入れ替えながら処理させる【Workflow(digdag)】


Treasure Data Workflow(digdag)を活用して集計JOB発注システムを作った

こちらの記事の続きです。

テーブルのレコードをパラメータにしてクエリを実行したい

こんな感じのテーブルがTD上にあったときに、

time dbname start_time end_time query output_file
1504191600 access_log1 2017/04/01 2017/04/30 analysis1 てすと集計1
1504191600 access_log2 2017/04/01 2017/04/30 analysis1 てすと集計2
1504191600 access_log3 2017/01/01 2017/06/30 analysis2 てすと集計3

内容に従って、順々にクエリを実行したいのです。具体的には、

  1. access_log1について、期間4/1~4/30とした上でanalysis1.sqlを実行 > てすと集計1に結果を吐き出す
  2. access_log2について、期間4/1~4/30とした上でanalysis1.sqlを実行 > てすと集計2に結果を吐き出す
  3. access_log3について、期間1/1~6/30とした上でnalysis2.sqlを実行 > てすと集計3に結果を吐き出す

とやっていきたい。手動でSQLの中身を書き換えて実行するのは面倒です。自動化しましょう。

Workflowを使う

そこで登場するのがTreasure Data Workflowです。
どんなものかをざっくり説明すると、TDが開発したワークフローエンジンです。ベースはこれまたTD開発のdigdagです。
digdagについてはTD古橋さんの解説がわかりやすいです↓
Digdagによる大規模データ処理の自動化とエラー処理

Workflowで何が出来るかというと、

  • 定期実行(バッチ)処理
  • パラメータの活用
  • 実行結果の管理
  • 簡単な再処理
  • Data Connectorと連携してTD外に読み書き

などが可能です。
今回は特にtd_for_eachを活用します。

td_for_eachでテーブルの内容をパラメータにする

+td_for_each>: query.sql
  _do:
    +each_query:
      td>: ${td.each.query}.sql

参考:http://docs.digdag.io/operators/td_for_each.html

${td.each.query}の部分はパラメータです。何をやってるのかというと、query.sqlの結果1レコードごとに、
内容をパラメータとして${td.each.****}に格納します。後続する_do:以降のクエリで${td.each.****}を呼び出すことで、クエリの一部分だけを入れ替えながら処理させることが出来ます。今回で言うとDB名やログの集計期間を入れ替えながら順々に処理させるのに活用します。
query.sqlの例はこんな感じです。集計条件が記されたテーブルから、直近一日の間に書き込まれたものだけを取り出してきています。

query.sql
select
dbname,
start_time,
end_time,
query,
output_file
from sample_db.sample_table
where td_time_range(time, td_time_add(td_scheduled_time(), '-1d', 'JST'), td_scheduled_time(),'JST')

上記クエリの結果が${td.each.****}にパラメータとしてセットされていきます。
パラメータを活用したクエリの例は以下のようになります。

analysis1.sql
select
axis,
count(*) as n,
sum(value) as sum_value
from ${td.each.dbname}.log_table
where td_time_range(time, '${td.each.start_time}', '${td.each.end_time}', 'JST')
group by axis
;

集計期間(start_time ~ end_time)や対象のDB名(dbname)をパラメータで変更した上で集計を行っています。

result_connectionでクエリ結果をGoogle Spreadsheetに出力する

ついでにresult_connectionを使って結果をGoogle Spreadsheet上に吐き出します。
詳しくは公式ドキュメントの他、
digdagの例も参照してください。

    +each_query:
      td>: ${td.each.query}.sql
      result_connection: sample_google_spreadsheet_connection
      result_settings:
        spreadsheet: ${td.each.output_file}
        worksheet: "シート1"
        mode: replace

sample_google_spreadsheet_connectionは事前に作成しておく必要があります。
Connections -> Google Sheetsでサクッと作成しておきましょう。

ソース

最終的に以下のようになります。

workflow.dig

timezone: "Asia/Tokyo"
schedule:
  daily>: 00:00:00

_export:
  td:
    database: sample_db
    table: sample_table

+for_each_records:
  td_for_each>: query.sql
  _do: # query.sqlの結果1レコードごとに以下を実行
    +each_query:
      td>: ${td.each.query}.sql
      result_connection: sample_google_spreadsheet_connection
      result_settings:
        spreadsheet: ${td.each.output_file}
        worksheet: "シート1"
        mode: replace

集計JOB発注システムとして使う

以前のエントリでGoogle Spreadsheetの内容をTDに読み込んでいました。
これは社内の誰でも使える環境であるGoogle Spreadsheetを集計JOBの発注表として使うためです。
「こんなログについて、いつからいつまでの期間で、こんな分析がしたい」というユーザーの要望を1つの集計JOBとしてGoogle Spreadsheetに記載してもらい、それをGoogle App ScriptでTDのTableに持っていきます。
TD側ではWorkflowがそのTableを読み込んでパラメータに分解、指定された分析に対応するクエリを走らせます。
結果はData Connector経由でユーザーが指定した名前のGoogle Spreadsheetに吐き出します。

かなり簡易的に作った仕組みですが意外に拡張性が高く気に入っています。
新しい分析がしたいという要望があればクエリを追加してそれを選んでもらえば良いのです。
新しいパラメータが必要になれば発注表に列を追加すれば良いです。
このあたりはスキーマレスなTreasure Dataを中心に据えているからこそのフットワークの軽さです。

ドキュメント

TD古橋さんのdigdag解説。
https://www.slideshare.net/frsyuki/digdag-76749443

digdagの公式ドキュメント
http://docs.digdag.io/

Treasure Data Data Connectorの公式ドキュメント
https://docs.treasuredata.com/articles/data-connector-overview

Treasure DataからGoogle Spreadsheetに出力するときの参考ドキュメント
https://docs.treasuredata.com/articles/result-into-google-spreadsheet