Treasure Dataで複数のクエリをパラメータを入れ替えながら処理させる【Workflow(digdag)】
Treasure Data Workflow(digdag)を活用して集計JOB発注システムを作った
こちらの記事の続きです。
テーブルのレコードをパラメータにしてクエリを実行したい
こんな感じのテーブルがTD上にあったときに、
time | dbname | start_time | end_time | query | output_file |
---|---|---|---|---|---|
1504191600 | access_log1 | 2017/04/01 | 2017/04/30 | analysis1 | てすと集計1 |
1504191600 | access_log2 | 2017/04/01 | 2017/04/30 | analysis1 | てすと集計2 |
1504191600 | access_log3 | 2017/01/01 | 2017/06/30 | analysis2 | てすと集計3 |
内容に従って、順々にクエリを実行したいのです。具体的には、
- access_log1について、期間4/1~4/30とした上でanalysis1.sqlを実行 > てすと集計1に結果を吐き出す
- access_log2について、期間4/1~4/30とした上でanalysis1.sqlを実行 > てすと集計2に結果を吐き出す
- access_log3について、期間1/1~6/30とした上でnalysis2.sqlを実行 > てすと集計3に結果を吐き出す
とやっていきたい。手動でSQLの中身を書き換えて実行するのは面倒です。自動化しましょう。
Workflowを使う
そこで登場するのがTreasure Data Workflowです。
どんなものかをざっくり説明すると、TDが開発したワークフローエンジンです。ベースはこれまたTD開発のdigdagです。
digdagについてはTD古橋さんの解説がわかりやすいです↓
Digdagによる大規模データ処理の自動化とエラー処理
Workflowで何が出来るかというと、
- 定期実行(バッチ)処理
- パラメータの活用
- 実行結果の管理
- 簡単な再処理
- Data Connectorと連携してTD外に読み書き
などが可能です。
今回は特にtd_for_each
を活用します。
td_for_eachでテーブルの内容をパラメータにする
+td_for_each>: query.sql
_do:
+each_query:
td>: ${td.each.query}.sql
参考:http://docs.digdag.io/operators/td_for_each.html
${td.each.query}
の部分はパラメータです。何をやってるのかというと、query.sql
の結果1レコードごとに、
内容をパラメータとして${td.each.****}
に格納します。後続する_do:
以降のクエリで${td.each.****}
を呼び出すことで、クエリの一部分だけを入れ替えながら処理させることが出来ます。今回で言うとDB名やログの集計期間を入れ替えながら順々に処理させるのに活用します。
query.sql
の例はこんな感じです。集計条件が記されたテーブルから、直近一日の間に書き込まれたものだけを取り出してきています。
select
dbname,
start_time,
end_time,
query,
output_file
from sample_db.sample_table
where td_time_range(time, td_time_add(td_scheduled_time(), '-1d', 'JST'), td_scheduled_time(),'JST')
上記クエリの結果が${td.each.****}
にパラメータとしてセットされていきます。
パラメータを活用したクエリの例は以下のようになります。
select
axis,
count(*) as n,
sum(value) as sum_value
from ${td.each.dbname}.log_table
where td_time_range(time, '${td.each.start_time}', '${td.each.end_time}', 'JST')
group by axis
;
集計期間(start_time ~ end_time)や対象のDB名(dbname)をパラメータで変更した上で集計を行っています。
result_connectionでクエリ結果をGoogle Spreadsheetに出力する
ついでにresult_connection
を使って結果をGoogle Spreadsheet上に吐き出します。
詳しくは公式ドキュメントの他、
digdagの例も参照してください。
+each_query:
td>: ${td.each.query}.sql
result_connection: sample_google_spreadsheet_connection
result_settings:
spreadsheet: ${td.each.output_file}
worksheet: "シート1"
mode: replace
sample_google_spreadsheet_connection
は事前に作成しておく必要があります。
Connections -> Google Sheetsでサクッと作成しておきましょう。
ソース
最終的に以下のようになります。
timezone: "Asia/Tokyo"
schedule:
daily>: 00:00:00
_export:
td:
database: sample_db
table: sample_table
+for_each_records:
td_for_each>: query.sql
_do: # query.sqlの結果1レコードごとに以下を実行
+each_query:
td>: ${td.each.query}.sql
result_connection: sample_google_spreadsheet_connection
result_settings:
spreadsheet: ${td.each.output_file}
worksheet: "シート1"
mode: replace
集計JOB発注システムとして使う
以前のエントリでGoogle Spreadsheetの内容をTDに読み込んでいました。
これは社内の誰でも使える環境であるGoogle Spreadsheetを集計JOBの発注表として使うためです。
「こんなログについて、いつからいつまでの期間で、こんな分析がしたい」というユーザーの要望を1つの集計JOBとしてGoogle Spreadsheetに記載してもらい、それをGoogle App ScriptでTDのTableに持っていきます。
TD側ではWorkflowがそのTableを読み込んでパラメータに分解、指定された分析に対応するクエリを走らせます。
結果はData Connector経由でユーザーが指定した名前のGoogle Spreadsheetに吐き出します。
かなり簡易的に作った仕組みですが意外に拡張性が高く気に入っています。
新しい分析がしたいという要望があればクエリを追加してそれを選んでもらえば良いのです。
新しいパラメータが必要になれば発注表に列を追加すれば良いです。
このあたりはスキーマレスなTreasure Dataを中心に据えているからこそのフットワークの軽さです。
ドキュメント
TD古橋さんのdigdag解説。
https://www.slideshare.net/frsyuki/digdag-76749443
digdagの公式ドキュメント
http://docs.digdag.io/
Treasure Data Data Connectorの公式ドキュメント
https://docs.treasuredata.com/articles/data-connector-overview
Treasure DataからGoogle Spreadsheetに出力するときの参考ドキュメント
https://docs.treasuredata.com/articles/result-into-google-spreadsheet
Author And Source
この問題について(Treasure Dataで複数のクエリをパラメータを入れ替えながら処理させる【Workflow(digdag)】), 我々は、より多くの情報をここで見つけました https://qiita.com/hase-ryo/items/1f103461c74a1b37a311著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .