digdagのtd_wait_table>とtd_wait>を使ってみた


digdag(正確にはTreasureData Workflow)のtd_wait_table>, td_wait>を使ってみたのでそのメモです。

ジョブフロー

こんなフローで試してみました。

1. td_wait_table>で「wf_wait_target」というテーブルができるのを待つ
2. 「wf_wait_target」にクエリなげる
3. td_wait>で「wf_check」テーブルに100レコード以上のデータがストアされるのを待つ
4. 「wf_check」テーブルの内容を「export_wf_check」にエクスポート

digファイル、SQLは以下になります。

digファイル

td_wait.dig
timezone: Asia/Tokyo

schedule:
  daily>: 09:00:00

_export:
  td:
    database: test

+task_wait_table:
  td_wait_table>: wf_wait_target

+task1:
  td>: queries/count_wf_wait_target.sql

+task_wait:
  td_wait>: queries/check_record.sql

+task2:
  td>: queries/export_wf_check.sql
  create_table: export_wf_check

SQL

count_wf_wait_target.sql
SELECT
    count(1) as cnt
FROM
    wf_wait_target

check_record.sql
SELECT
    count(1) > 100
FROM
    wf_check

export_wf_check.sql
SELECT
    *
FROM
    wf_check

ジョブを実行してみる

作成したdig,SQLをTreasureData WorkflowにPushしてさっそく実行してみます。

td wf push wf_sample
td wf start wf_sample td_wait --session now

ここからはTreasureData Workflowのコンソールで確認していきます。

td_wait_table

「wf_wait_target」というテーブルができるのを待っています。
30秒に1度テーブルの存在をチェックしているのがわかります。

td_wait(レコード数)チェック

「wf_wait_target」テーブルを作成すると次に進んで、check_record.sqlを実行し、100行以上になっているかチェックしています。
Timelineも進んでいますね。

実行完了

「wf_check」テーブルに10,000行をInsertすると、最後のエクスポートまで無事完了しました。

さいごに

digdagのドキュメントにも記載されていますが、td_wait_tableは単純なテーブル有無だけではなくレコード数も条件に入れられるので非常に使い勝手がよさそうに思えます。
また、td_waitはtrueを返すまで待つので、単純な件数以外でも使えるのがいいですね。

ドキュメントには書いてないですが、このwaitにタイムアウトはあるのだろうか?

(digdagドキュメント)
* td_wait_table
* td_wait