TD workflow を初めて書くときに知っておきたかったこと


TreasureDataのworkflow、使えるようになったらいろんなこと出来るんだろうなとは思いつつ
書き方よくわからんしって敬遠してたけど遂に使わないとダメな状況に直面したので
調べて学んだことを覚え書きします。
(最近記憶喪失が激しいので未来の自分に届け…)

書き方とtips

詳しいことやちゃんとしたことは公式とか他のQiita参照
digdagがベースなので、記法はdigdagのサイトを調べれば良い

いくつかのtipsをベースに書き方を紹介する

1.あるクエリを実行し終えたら次に別のあるクエリを実行する

これが基本的な書き方(だと思ってる)

test.dig
timezone: Asia/Tokyo
_export: # 変数の定義
  td:
    database: test_database # 実行するdatebase名
schedule:
  daily>: 8:00:00 # スケジュールの設定。他にもweekly,monthlyなどある。cronの指定も可能

+task_1: # タスク名。任意。+〜〜〜で表す
  td_run>: "保存済みクエリ1" # TDに保存しているqueryの名称またはIDを記載

+task_2:
  td_run>: "保存済みクエリ2"

上記の例では、

  • JSTで毎朝8時に
  • test_database に対して
  • まずtask_1の「保存済みクエリ1」を実行する
  • それが完了したら、次にtask_2の「保存済みクエリ2」を実行する

ということをやっている。
基本として以下を抑えておくと読み書きできる

  • +XXXXXXでタスクを表す
  • YYYYYY>でオペレータを表す。オペレータの種類や意味はdocsの以下参照
  • 基本1タスクに1オペレータ
    • 2つ以上入れるとA task can't have more than one operatorって怒られる

- とっても参考になるリンク集

2.並列で実行したい

上記は1つのクエリ実行が完了したら次を実行するが
干渉しないクエリなどの場合、2つ以上のクエリを同時に実行したい場合

test.dig
+parallel_task:
  _parallel: true
  +task_1:
    td_run>: "保存済みクエリ1"

  +task_2:
    td_run>: "保存済みクエリ2"

+task_3:
  td_run>: "保存済みクエリ3"

_parallel: trueというオプションを設定すれば、それより下位のタスクは並列で実行する
この場合だと、クエリ1とクエリ2が同時に実行される

パラレル設定は下位タスクにのみ適用されるので、
クエリ1とクエリ2の両方が完了すれば、その次のタスク(この場合はクエリ3)に実行が移る

3.クエリ内のパラメータを動的に変更して実行する

TDクエリ実行時に内部で変数を使えるのは実行時間だけ(TD_SCHEDULE_TIME())だと思っているが(え、違うの?)
workflowを使えばクエリ内でパラメータが使える

test.dig
_export:
  exec_date: '2021-05-01' # exec_dateいう変数に'2021-05-01'という値をセット
  td:
    database: test_datebase

+task_1:
  td>: test.sql # クエリ内で呼び出した${exec_date}に'2021-05-01'が入った状態で実行される

test.sqlに埋め込んだ変数${exec_date}2021-05-01が出力される
ただしTDで保存したクエリに変数を埋め込んでも変換されなかったので、
この方法で実行したいクエリはファイルに記載し、同じproject配下に含めておく

クエリ側で変数を使う場合は以下のように書く

test.sql
SELECT '${exec_date}'
-- => '2021-05-01'

''で囲むの忘れないこと

4.前のクエリの結果を変数として扱う

前述のクエリ変数と合わせればもっと可能性が広がる、、、!

test.dig
+task_1:
  store_last_results: true # この設定をする
  database: test_database
  td>: 
  query: "select max(create_date) as last_date from users"
  # この場合、以下のようなデータが返ってくるので、この値が保存される
  #   {last_date: '2021-05-01'}
  # なお、上記のように直接クエリを書いても、今までのようにtd_run>: で保存済みクエリを指定しても良い

+task_2:
  echo>: ${td.last_results.last_date}
  # ${td.last_results.xxxx} という記法で保存データにアクセス
  # xxxxはハッシュのキー名(今回は last_date)
  # => '2021-05-01'という結果が出力される

  • 他にも便利な変数:digdag - workflow_definition
    • ex. ${session_date}:そのセッションの実行日を持つ ex.2021-05-01
  • クエリをquery:に直接書く場合は、td>とインデントを揃える

5.特定の条件でクエリの実行有無を制御する

例えば、とあるテーブルが更新されていたら実行するなど
(そのテーブルの更新からworkflowを組むのが困難な場合)
先述の実行結果保持とのあわせ技
if文とかeachとかも使えるよっていう紹介です

test.dig
# test_database.usersのcreate_dateの更新日(=最大値)が実行日と同じだったらクエリを実行したい
+task_1:
  store_last_results: true
  database: test_database
  td>: 
  query: "select max(create_date) as last_date from users"
  # => ${td.last_results.last_date}には'2021-05-01'などが入るとする

+task_2:
  if>: ${td.last_results.last_date === session_date} # ${} === ${} と書くとエラーなので注意
  _do: # if文の結果がtrueの場合
    td_run>: "保存したクエリ1"

  _else_do: # if文の結果がfalseの場合
    echo>: "[実行なし]max_date:${td.last_results.max_date}"
  • ${td.last_results.last_date} === ${session_date}って書いたらエラー出るのは 以下記事に助けられました。
  • task2do以下でtd_run>:というオペレータを実行しているが、2つ以上のオペレーターを実行する場合はタスクにすれば良い(コメント参照。ありがとうございます!)

実行できなかったら時間を置いてから再実行したい場合はretryすると良い

6.実行できなかった場合に時間を置いて再実行させたい

以下は、10分間隔で最大10回まで再実行する処理
実行前にデータのチェックを行い、データがなくて処理を実行して欲しくない場合にfailするようにしておく
そうすると後続の処理には入らず、failした処理を成功するまで再実行してくれる
10回やっても成功できなかったら全体が失敗になり、ここで処理がストップ

test.dig
+first_check:
  _retry:
    limit: 10
    interval: 600 # 10分間隔

  # 実行対象のデータがあるかをチェックする
  +get_last_date:
    store_last_results: true
    database: test_database
    td>: 
    query: "select max(create_date) as last_date from users"

  # 実行対象のデータがなければERROR → ERRORが出てる間はループしてくれる
  +check_date:
    if>: ${td.last_results.last_date === session_date}
    _do:
      fail>: ERROR no data

# データがあれば処理を実行
+task1:
 ・・・

7.特定の日付で処理を実行したい

session_timeを設定すればTD_SCHEDULED_TIME()として実行される

想定しているケース例
WFの処理自体は成功したが、その日取得したデータが不正だったために
結果として正しくないWFが実行されてしまった。
例えば、一旦対象日のデータだけを全部消して、その日の取得としてやり直したい。

TDに保存しているクエリでTD_SCHEDULED_TIME()を使っている場合は
先述の'${exec_date}'みたいな使い方ができない。

(書いててかなりピンポイントな背景に思えてきた………私以外に困ってる人いるんかな……)

今回は、その「やり直したい」日が複数日あるとして、ループで処理する方法を紹介

test.dig
_export:
  period: # ここで設定したfirst〜lastまでのすべての日で実行する(lastの日付は含まれないので注意。未満扱い)
    first: "2021-10-01"
    last:  "2021-10-05"

+for_range_from_to:
  for_range>:
    from: ${moment(period.first).unix()}
    to:   ${moment(period.last).unix()}
    step: 86400

  _do:
    _export:
      exec_date: ${moment.unix(range.from).format("YYYY-MM-DD")} #実行日を年月日形式で
      yesterday: ${moment.unix(range.from - 86400).format("YYYY-MM-DD")} # 実行日-1日など加算減算も可能

    # 直接クエリを書く場合は、先述同様設定したパラメータを入れれば良い
    +task1:
      query: "delete from users where create_date = '${yesterday}'"
      td>:

    # TDに保存したクエリを実行する場合
    # ここで設定した session_time が TD_SCHEDULED_TIME() として実行される
    +task2:
      session_time: ${moment.unix(range.from).format()}
      td_run>: "実行したいTD保存クエリ"

 ・・・

なお、こちらのTD本家の記事を応用させていただきました!!!ありがとうございます!!!

ローカルからの接続方法

上記はTDのworkflowsからGUIで編集できるけど、
ソースをgit管理したり何回も修正入れたりする場合にはローカルからコマンド操作する方が便利

以下、tdコマンドは入れてある前提

作成したdigdagをTDにpushする

作成したworkflowは拡張子digでファイルとして保存
このときのファイル名がそのままworkfow名になる

例えば、ローカルで以下のようなディレクトリ構造にしていたとする

workflows
├── project_1
│   └── workflow_1.dig
└── project_2
    ├── workflow_2.dig
    ├── workflow_3.dig
    └── test.sql

各ディレクトリ配下で以下を実行すると...

(~/project_1)
$ td wf push project_1
# => tdには「progect_1」というプロジェクトの「workflow_1」っていうフローが出来上がる

(~/project_2)
$ td wf push project_2
# => tdには「progect_2」というプロジェクトに「workflow_2」「workflow_3」っていうフローが出来る
#    「workflow_2」「workflow_3」では「test.sql」を呼び出せる
  • あんまり複雑なworkflowを作らなければ、プロジェクトはあくまで簡単なグルーピングみたいなもの
    • No.5のようにフロー内でsqlファイルを実行する場合は同じプロジェクトに入れれば良い
  • 主役は1つ1つのworkflowなので、TDのGUI上でもファーストビューはworkflow単位で表示される
  • 同じプロジェクト(ディレクトリ配下)はまとめてpushするしかなく、変更がないdigファイルでもpushされたらその度にリビジョンが書き換わる
    • オプションでいい感じにできたらごめんなさい&教えてください

このへんの操作をミスるとTD上に意図しないプロジェクトやファイルがあがるので注意(やらかし済み)

ローカルで実行する

実行時はrunコマンドでdigdagファイルを指定

(~/project_1)
$ td wf run workflow_1.dig

ちなみに、ローカルから実行した場合、TDのworkflow側に履歴は残らない(jobは残る)
ローカルの.digdagディレクトリに、その日の日付のディレクトリができ、その中にあるファイルで実行履歴が管理されている。

で、前述のsession_timeの制御と同じく、一度その日のセッションで実行に成功してしまった場合、再度実行するとスキップされてしまう。
その場合は、.digdagディレクトリ内の該当の日付のディレクトリをrenameすれば再実行できる。
(もっといいやり方あるのかも、実行時のオプション指定とか)
ローカルで実行する場合は検証目的なことが多いと思うのでここ注意でした。

まとめ

workflow=Treasuredata固有の記述方法だと思っていたが、digdagは他のデータ基盤などでも使われてることを知った
これ一般的な記法だったのか!と思ったけど、そもそもTreasuredataが提供しているということをその後知った
やっぱTDすごい。。(語彙力)

まだまだ調べが浅いので、できないと思ってるけどできるとか、間違ってるとかあればご指摘ください

workflow書けるようになったらできることが倍になって超楽しい