Treasure Workflowを触ってみる


Treasure Data上でのジョブワークフローが作成・管理できるTreaure Workflowを触ってみたメモ。

Treasure Workflowの特徴

ドキュメントで挙げられている特徴を訳す。

  • DAG形式のワークフローが作成できる
  • ローカルで実装してTreasure Dataに登録してスケジュール実行させられる。まさにワークフローのSaas
  • Treasure Dataの機能が使える。Data Connectorによるデータインポート, Presto, Hiveでのクエリ実行、他システムへのデータエクスポート
  • 再利用しやすいパラメタ化(何を言っているのかいまいち分からず。。。)
  • ワークフロー失敗やSLA未達の場合にアラート通知

その他、タスクの並列実行機能もある。

ベースになっているエンジンはDigdagとしてOSS公開されている。
Treasure WorkflowはDigdagの一部機能が使えない状態になっているとのこと。
また、Treasure Workflowとしてもpublic betaなので色々と制限はある様子。

動かしてみる

Tutorialを試す

ドキュメントに従って試す。

なお、事前にTreasure DataのクライアントコマンドツールTD Toolbeltをインストールしておく必要がある。

td workflowを実行すると初回時にモジュールのインストールが走る。

$ td workflow
Workflow module not yet installed, download now? [Y/n]
Y
Downloading Java...: done                                                                                               
Installing Java... done
Downloading workflow module...: done                                                                                    
Installing workflow module... done
2016-10-12 00:44:02 +0900: Digdag v0.8.17
Usage: td workflow <command> [options...]
  Local-mode commands:
    init <dir>                         create a new workflow project
    r[un] <workflow.dig>               run a workflow
    c[heck]                            show workflow definitions
    sched[uler]                        run a scheduler server
    selfupdate                         update cli to the latest version

  Server-mode commands:
    server                             start server

  Client-mode commands:
    push <project-name>                create and upload a new revision
    start <project-name> <name>        start a new session attempt of a workflow
    retry <attempt-id>                 retry a session
    kill <attempt-id>                  kill a running session attempt
    backfill <project-name> <name>     start sessions of a schedule for past times
    reschedule                         skip sessions of a schedule to a future time
    log <attempt-id>                   show logs of a session attempt
    workflows [project-name] [name]    show registered workflow definitions
    schedules                          show registered schedules
    disable <schedule-id>              disable a workflow schedule
    disable <project-name>             disable all workflow schedules in a project
    disable <project-name> <name>      disable a workflow schedule
    enable <schedule-id>               enable a workflow schedule
    enable <project-name>              enable all workflow schedules in a project
    enable <project-name> <name>       enable a workflow schedule
    sessions                           show sessions for all workflows
    sessions <project-name>            show sessions for all workflows in a project
    sessions <project-name> <name>     show sessions for a workflow
    session  <session-id>              show a single session
    attempts                           show attempts for all sessions
    attempts <session-id>              show attempts for a session
    attempt  <attempt-id>              show a single attempt
    tasks <attempt-id>                 show tasks of a session attempt
    delete <project-name>              delete a project
    secrets --project <project-name>   manage secrets
    version                            show client and server version

  Options:
    -L, --log PATH                   output log messages to a file (default: -)
    -l, --log-level LEVEL            log level (error, warn, info, debug or trace)
    -X KEY=VALUE                     add a performance system config
    -c, --config PATH.properties     Configuration file (default: /Users/oonishitk/.config/digdag/config)

Use `<command> --help` to see detailed usage of a command.

なお、td workflowコマンドはtd wfと省略して実行できる。これは助かる。

Tutorial用のデータベースを作る。

$ td db:create workflow_temp
Database 'workflow_temp' is created.
Use 'td table:create workflow_temp <table_name>' to create a table.

Tutorial用のワークフローサンプルをダウンロードしてくる。

$ cd ~/Downloads
$ curl -o nasdaq_analysis.zip -L https://gist.github.com/danielnorberg/f839e5f2fd0d1a27d63001f4fd19b947/raw/d2d6dd0e3d419ea5d18b1c1e7ded9ec106c775d4/nasdaq_analysis.zip
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100  1403  100  1403    0     0    933      0  0:00:01  0:00:01 --:--:--   933
$ unzip nasdaq_analysis.zip 
Archive:  nasdaq_analysis.zip
   creating: nasdaq_analysis/
  inflating: nasdaq_analysis/nasdaq_analysis.dig  
   creating: nasdaq_analysis/queries/
  inflating: nasdaq_analysis/queries/daily_open.sql  
  inflating: nasdaq_analysis/queries/monthly_open.sql
$ cd nasdaq_analysis
$ ls -l
total 8
-rw-r--r--  1 oonishitk  staff  224  4 27 08:34 nasdaq_analysis.dig
drwxr-xr-x  4 oonishitk  staff  136  4 27 08:31 queries

ワークフローの内容は以下の感じ。
task1とtask2をシリアルに実行するものになっている。

nasdaq_analysis.dig
timezone: UTC

schedule:
  daily>: 07:00:00

_export:
  td:
    database: workflow_temp

+task1:
  td>: queries/daily_open.sql
  create_table: daily_open

+task2:
  td>: queries/monthly_open.sql
  create_table: monthly_open

td workflow runでTreasure Dataにスケジュール登録せずに、ワンショット実行させる。

$ td workflow run nasdaq_analysis
2016-10-12 00:51:36 +0900: Digdag v0.8.17
2016-10-12 00:51:37 +0900 [WARN] (main): Using a new session time 2016-10-11T00:00:00+00:00 based on schedule.
2016-10-12 00:51:37 +0900 [INFO] (main): Using session /Users/bwtakacy/Downloads/nasdaq_analysis/.digdag/status/20161011T000000+0000.
2016-10-12 00:51:37 +0900 [INFO] (main): Starting a new session project id=1 workflow name=nasdaq_analysis session_time=2016-10-11T00:00:00+00:00
2016-10-12 00:51:38 +0900 [INFO] (0018@+nasdaq_analysis+task1): td>: queries/daily_open.sql
2016-10-12 00:51:38 +0900 [INFO] (0018@+nasdaq_analysis+task1): td-client version: 0.7.26
2016-10-12 00:51:38 +0900 [INFO] (0018@+nasdaq_analysis+task1): Logging initialized @2621ms
2016-10-12 00:51:39 +0900 [INFO] (0018@+nasdaq_analysis+task1): td>: queries/daily_open.sql
2016-10-12 00:51:40 +0900 [INFO] (0018@+nasdaq_analysis+task1): Started presto job id=95352325:
DROP TABLE IF EXISTS "daily_open";
CREATE TABLE "daily_open" AS
SELECT 
    TD_DATE_TRUNC('day', time), AVG(open) AS daily_avg_open, AVG(close) AS daily_avg_close
FROM 
    sample_datasets.nasdaq
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1

2016-10-12 00:51:42 +0900 [INFO] (0018@+nasdaq_analysis+task1): td>: queries/daily_open.sql
2016-10-12 00:51:45 +0900 [INFO] (0018@+nasdaq_analysis+task1): td>: queries/daily_open.sql
2016-10-12 00:51:50 +0900 [INFO] (0018@+nasdaq_analysis+task1): td>: queries/daily_open.sql
2016-10-12 00:51:52 +0900 [INFO] (0018@+nasdaq_analysis+task2): td>: queries/monthly_open.sql
2016-10-12 00:51:52 +0900 [INFO] (0018@+nasdaq_analysis+task2): td>: queries/monthly_open.sql
2016-10-12 00:51:54 +0900 [INFO] (0018@+nasdaq_analysis+task2): Started presto job id=95352354:
DROP TABLE IF EXISTS "monthly_open";
CREATE TABLE "monthly_open" AS
SELECT 
    TD_DATE_TRUNC('month', time), 
    AVG(daily_avg_open) AS monthly_avg_open, 
    AVG(daily_avg_close) AS month_avg_close
FROM 
    daily_open
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1
2016-10-12 00:51:55 +0900 [INFO] (0018@+nasdaq_analysis+task2): td>: queries/monthly_open.sql
2016-10-12 00:51:58 +0900 [INFO] (0018@+nasdaq_analysis+task2): td>: queries/monthly_open.sql
2016-10-12 00:52:03 +0900 [INFO] (0018@+nasdaq_analysis+task2): td>: queries/monthly_open.sql
Success. Task state is saved at /Users/bwtakacy/Downloads/nasdaq_analysis/.digdag/status/20161011T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

ワークフローが正常に実行された結果、クエリの結果テーブルが作成されていることが確認できる。

$ td table:show workflow_temp daily_open
Name        : workflow_temp.daily_open
Type        : log
Count       : 438
Schema      : (
    _col0:long
    daily_avg_open:double
    daily_avg_close:double
)
$ td table:show workflow_temp monthly_open
Name        : workflow_temp.monthly_open
Type        : log
Count       : 0
Schema      : (
    _col0:long
    monthly_avg_open:double
    month_avg_close:double
)

さて、このワークフローをTreasure Dataにスケジュールジョブとして登録する。

$ td workflow push nasdaq_analysis
2016-10-12 00:56:41 +0900: Digdag v0.8.17
Creating .digdag/tmp/archive-5329639364818920963.tar.gz...
  Archiving nasdaq_analysis.dig
  Archiving queries/daily_open.sql
  Archiving queries/monthly_open.sql
Workflows:
  nasdaq_analysis

Uploaded:
  id: 38188
  name: nasdaq_analysis
  revision: 7405dfb8-8176-44de-8983-b8419cb7f79a
  archive type: db
  project created at: 2016-10-11T15:56:44Z
  revision updated at: 2016-10-11T15:56:44Z

Use `td workflow workflows` to show all workflows.

登録されたスケジュールジョブは以下のコマンドから確認できる。

$ td workflow workflows
2016-10-12 00:56:52 +0900: Digdag v0.8.17
  nasdaq_analysis
    nasdaq_analysis

Use `td workflow workflows <project-name> <name>` to show details.
$ td workflow workflows nasdaq_analysis nasdaq_analysis
2016-10-12 00:57:22 +0900: Digdag v0.8.17
_export:
  td:
    database: "workflow_temp"
schedule:
  daily>: "07:00:00"
+task1:
  td>: "queries/daily_open.sql"
  create_table: "daily_open"
+task2:
  td>: "queries/monthly_open.sql"
  create_table: "monthly_open"

$ td workflow schedules
2016-10-12 00:58:37 +0900: Digdag v0.8.17
Schedules:
  id: 112
  project: nasdaq_analysis
  workflow: nasdaq_analysis
  disabled at: 
  next session time: 2016-10-12 00:00:00 +0000
  next scheduled to run at: 2016-10-12 16:00:00 +0900 (in 15h 1m 22s)

1 entries.
Use `td workflow workflows [project-name] [workflow-name]` to show workflow details.

また、一時的にスケジュール実行を無効にすることができる。

$ td wf disable nasdaq_analysis
2016-10-12 23:01:53 +0900: Digdag v0.8.17
Disabled schedule id: 112
[23:01:57 bwtakacy ~]$ td wf schedules
2016-10-12 23:02:02 +0900: Digdag v0.8.17
Schedules:
  id: 112
  project: nasdaq_analysis
  workflow: nasdaq_analysis
  disabled at: 2016-10-12 23:01:57 +0900 (-5s ago)
  next session time: 2016-10-13 00:00:00 +0000
  next scheduled to run at: 2016-10-13 16:00:00 +0900 (in 16h 57m 57s)

1 entries.
Use `td workflow workflows [project-name] [workflow-name]` to show workflow details.

再び有効にするにはtd wf enableを利用する。

障害時の挙動

ドキュメントだとここに書かれている。

なお、同様の情報はWebコンソールからでもポチポチと確認できる。
https://workflows.treasuredata.com

最初にワークフローを失敗させるためにドキュメントで紹介されているように、わざと参照先のテーブル名をtypoしてみる。

monthly_open.sql
SELECT 
    TD_DATE_TRUNC('month', time), 
    AVG(daily_avg_open) AS monthly_avg_open, 
    AVG(daily_avg_close) AS month_avg_close
FROM 
    daily_opne  // 本当はdaily_open
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1

これを知らないものとして、ジョブエラーからどうやって原因特定ができるかというのが以下のステップ。
まずはスケジュールジョブとしてTreasure Dataに登録しておく。

$ td wf push nasdaq_analysis
2016-10-12 23:05:41 +0900: Digdag v0.8.17
Creating .digdag/tmp/archive-2082789604168105713.tar.gz...
  Archiving nasdaq_analysis.dig
  Archiving queries/daily_open.sql
  Archiving queries/monthly_open.sql
Workflows:
  nasdaq_analysis

Uploaded:
  id: 38188
  name: nasdaq_analysis
  revision: d98ff854-8a67-47e6-a961-025fde657def
  archive type: db
  project created at: 2016-10-11T15:56:44Z
  revision updated at: 2016-10-12T14:05:45Z

Use `td workflow workflows` to show all workflows.

スケジュール登録されたジョブの次のセッションを即時に実行させるにはtd wf startを使う。

$ td wf start nasdaq_analysis nasdaq_analysis --session now
2016-10-12 23:06:19 +0900: Digdag v0.8.17
Started a session attempt:
  session id: 487165
  attempt id: 509667
  uuid: 2dc34a99-d68c-4370-ae1a-6e5e1f9e4cdb
  project: nasdaq_analysis
  workflow: nasdaq_analysis
  session time: 2016-10-12 14:06:20 +0000
  retry attempt name: 
  params: {"last_session_time":"2016-10-12T00:00:00+00:00","next_session_time":"2016-10-13T00:00:00+00:00"}
  created at: 2016-10-12 23:06:24 +0900

* Use `td workflow session 487165` to show session status.
* Use `td workflow task 509667` and `td workflow log 509667` to show task status and logs.

ちなみに、td wf startは次のセッションの開始時間の変更や、リラン、ドライランなど色々な機能が用意されているようだ。

$ td wf start
2016-10-12 23:49:48 +0900: Digdag v0.8.17
Usage: td workflow start <project-name> <name>
  Options:
        --session <hourly | daily | now | yyyy-MM-dd | "yyyy-MM-dd HH:mm:ss">  set session_time to this time (required)
        --revision <name>            use a past revision
        --retry NAME                 set retry attempt name to a new session
    -d, --dry-run                    tries to start a session attempt but does nothing
    -p, --param KEY=VALUE            add a session parameter (use multiple times to set many parameters)
    -P, --params-file PATH.yml       read session parameters from a YAML file
    -e, --endpoint HOST[:PORT]       HTTP endpoint (default: http://127.0.0.1:65432)
    -L, --log PATH                   output log messages to a file (default: -)
    -l, --log-level LEVEL            log level (error, warn, info, debug or trace)
    -X KEY=VALUE                     add a performance system config
    -c, --config PATH.properties     Configuration file (default: /Users/bwtakacy/.config/digdag/config)


  Examples:
    $ td workflow start myproj workflow1 --session 2016-01-01  # use this day as session_time
    $ td workflow start myproj workflow1 --session hourly      # use current hour's 00:00
    $ td workflow start myproj workflow1 --session daily       # use current day's 00:00:00

失敗したぜよ通知がメールで送信されてくる。
先ほど実行したセッションの状況を確認すると、status: errorとなってセッションの試行attemptが失敗していることがわかる。

$ td wf session 487165
2016-10-12 23:07:56 +0900: Digdag v0.8.17
  session id: 487165
  attempt id: 509667
  uuid: 2dc34a99-d68c-4370-ae1a-6e5e1f9e4cdb
  project: nasdaq_analysis
  workflow: nasdaq_analysis
  session time: 2016-10-12 14:06:20 +0000
  retry attempt name: 
  params: {"last_session_time":"2016-10-12T00:00:00+00:00","next_session_time":"2016-10-13T00:00:00+00:00"}
  created at: 2016-10-12 23:06:24 +0900
  kill requested: false
  status: error

失敗したattemptに含まれるtaskの状況を確認する。

$ td wf tasks 509667
2016-10-12 23:09:40 +0900: Digdag v0.8.17
   id: 900691
   name: +nasdaq_analysis
   state: group_error
   config: {"schedule":{"daily>":"07:00:00"},"_export":{"td":{"database":"workflow_temp"}}}
   parent: null
   upstreams: []
   export params: {"td":{"database":"workflow_temp"}}
   store params: {}
   state params: {}

   id: 900692
   name: +nasdaq_analysis+task1
   state: success
   config: {"td>":"queries/daily_open.sql","create_table":"daily_open"}
   parent: 900691
   upstreams: []
   export params: {}
   store params: {"td":{"last_job_id":"95560560"}}
   state params: {"job":{"jobId":"95560560","domainKey":"ed457f44-bee0-4414-b6c1-20dca09cd72d","pollIteration":2,"errorPollIteration":null}}

   id: 900693
   name: +nasdaq_analysis+task2
   state: error
   config: {"td>":"queries/monthly_open.sql","create_table":"monthly_open"}
   parent: 900691
   upstreams: [900692]
   export params: {}
   store params: {}
   state params: {}

   id: 900694
   name: +nasdaq_analysis^failure-alert
   state: success
   config: {"_type":"notify","_command":"Workflow session attempt failed"}
   parent: 900691
   upstreams: []
   export params: {}
   store params: {}
   state params: {}

4 entries.

task+nasdaq_analysis+task2state: errorとなっていることがわかる。
+nasdaq_analysis自体はstate: group_errorという状態になっている。子taskのいずれかが失敗している状態を示していると思われる。

失敗したtaskのログを確認する。

$ td wf log 509667 +nasdaq_analysis+task2
2016-10-12 23:12:45 +0900: Digdag v0.8.17
2016-10-12 14:06:44.864 +0000 [INFO] (1093@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:06:50.203 +0000 [INFO] (0565@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:06:52.131 +0000 [INFO] (0565@+nasdaq_analysis+task2) io.digdag.standards.operator.td.TdOperatorFactory$TdOperator: Started presto job id=95560605:
DROP TABLE IF EXISTS "monthly_open";
CREATE TABLE "monthly_open" AS
SELECT 
    TD_DATE_TRUNC('month', time), 
    AVG(daily_avg_open) AS monthly_avg_open, 
    AVG(daily_avg_close) AS month_avg_close
FROM 
    daily_opne
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1

2016-10-12 14:06:57.247 +0000 [INFO] (0559@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:07:00.260 +0000 [ERROR] (0559@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: Task +nasdaq_analysis+task2 failed.
started at 2016-10-12T14:06:53Z
executing query: DROP TABLE IF EXISTS "monthly_open"
Started fetching results.
executing query: CREATE TABLE "monthly_open" AS
SELECT 
    TD_DATE_TRUNC('month', time), 
    AVG(daily_avg_open) AS monthly_avg_open, 
    AVG(daily_avg_close) AS month_avg_close
FROM 
    daily_opne
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1
finished at 2016-10-12T14:06:54Z

Query 20161012_140653_88801_n5e4x failed: line 7:2: Table td-presto.workflow_temp.daily_opne does not exist
 (task execution)
> TD job 95560605 failed with status ERROR (td job)

長いけど、ログの末尾にTable td-presto.workflow_temp.daily_opne does not existとあるのに気づけばよい。これで原因特定。

SQLを修正して再度TreasureDataに登録する。

$ td wf push nasdaq_analysis
2016-10-12 23:17:50 +0900: Digdag v0.8.17
Creating .digdag/tmp/archive-4696221085390888645.tar.gz...
  Archiving nasdaq_analysis.dig
  Archiving queries/daily_open.sql
  Archiving queries/monthly_open.sql
Workflows:
  nasdaq_analysis

Uploaded:
  id: 38188
  name: nasdaq_analysis
  revision: 12877724-f51d-4643-9ead-ea6ccea26593
  archive type: db
  project created at: 2016-10-11T15:56:44Z
  revision updated at: 2016-10-12T14:17:53Z

Use `td workflow workflows` to show all workflows.

あとは、失敗したセッションのリランを行う。
その際、td wf sessionではなくtd wf retryコマンドを使ったほうがよい、とのこと。
session timeというパラメータが失敗したときのもののまま再実行されるため、時間依存のパラメータを利用している場合に条件を保つことができるメリットがあるため・・・書いていて何を言っているのか分からなくなってきた。このあたりDigdagの概念をあまり理解していないので、そちらを調査しないといけないなぁ。

td wf retryコマンドのヘルプを確認する。

$ td wf retry 
2016-10-12 23:22:09 +0900: Digdag v0.8.17
Usage: td workflow retry <attempt-id>
  Options:
        --name <name>                unique identifier of this retry attempt instead of auto-generated UUID
        --latest-revision            use the latest revision
        --keep-revision              keep the same revision
        --revision <name>            use a specific revision
        --all                        retry all tasks
        --resume                     retry only non-successful tasks
        --resume-from <+name>        retry from a specific task

ドキュメントはワークフロー全体を再実行する--allオプションを使っているが、ここでは失敗したtaskだけを再実行する--resumeオプションを使ってみる。

$ td wf retry 509667 --name fix-typo --latest-revision --resume
2016-10-12 23:22:50 +0900: Digdag v0.8.17
Started a session attempt:
  session id: 487165
  attempt id: 509673
  uuid: 2dc34a99-d68c-4370-ae1a-6e5e1f9e4cdb
  project: nasdaq_analysis
  workflow: nasdaq_analysis
  session time: 2016-10-12 14:06:20 +0000
  retry attempt name: fix-typo
  params: {"last_session_time":"2016-10-12T00:00:00+00:00","next_session_time":"2016-10-13T00:00:00+00:00"}
  created at: 2016-10-12 23:22:54 +0900

* Use `td workflow session 487165` to show session status.
* Use `td workflow task 509673` and `td workflow log 509673` to show task status and logs.

新しいセッション試行の状況を確認すると、今度は+nasdaq_analysis+task2に成功していることがわかる。

$ td wf task 509673
2016-10-12 23:24:27 +0900: Digdag v0.8.17
   id: 900713
   name: +nasdaq_analysis
   state: success
   config: {"schedule":{"daily>":"07:00:00"},"_export":{"td":{"database":"workflow_temp"}}}
   parent: null
   upstreams: []
   export params: {"td":{"database":"workflow_temp"}}
   store params: {}
   state params: {}

   id: 900714
   name: +nasdaq_analysis+task1
   state: success
   config: {"td>":"queries/daily_open.sql","create_table":"daily_open"}
   parent: 900713
   upstreams: []
   export params: {}
   store params: {"td":{"last_job_id":"95560560"}}
   state params: {}

   id: 900715
   name: +nasdaq_analysis+task2
   state: success
   config: {"td>":"queries/monthly_open.sql","create_table":"monthly_open"}
   parent: 900713
   upstreams: [900714]
   export params: {}
   store params: {"td":{"last_job_id":"95562707"}}
   state params: {"job":{"jobId":"95562707","domainKey":"e609a316-b7a1-4852-952a-992a8ca4e76f","pollIteration":1,"errorPollIteration":null}}

3 entries.

セッション試行のログを確認すると、確かに失敗していた+nasdaq_analysis+task2だけが実行されていることがわかる。

$ td wf log 509673
2016-10-13 00:06:45 +0900: Digdag v0.8.17
2016-10-12 14:22:54.877 +0000 [INFO] (0466@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:22:55.309 +0000 [INFO] (0974@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:22:55.840 +0000 [INFO] (0974@+nasdaq_analysis+task2) io.digdag.standards.operator.td.TdOperatorFactory$TdOperator: Started presto job id=95562707:
DROP TABLE IF EXISTS "monthly_open";
CREATE TABLE "monthly_open" AS
SELECT 
    TD_DATE_TRUNC('month', time), 
    AVG(daily_avg_open) AS monthly_avg_open, 
    AVG(daily_avg_close) AS month_avg_close
FROM 
    daily_open
WHERE
    TD_TIME_RANGE(time, '2013-01-01','2015-01-01')
GROUP BY 
    1

2016-10-12 14:22:57.651 +0000 [INFO] (0642@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql
2016-10-12 14:22:59.920 +0000 [INFO] (0539@+nasdaq_analysis+task2) io.digdag.core.agent.OperatorManager: td>: queries/monthly_open.sql

所感

ちょっと触っただけだが、Treasure Dataで動かすためにしっかりと設計されている印象。

とくに、エラー時の情報取得、原因特定の流れが明確にコマンド化されているのが素晴らしい。クラウドでの運用サポートまで考慮しているのであろう。

ワークフローの可視化は出来ないのかな?ちょっと調べきれず。
可視化ができないと、複雑なジョブを作ってしまったときに負債化するのでやや気になる。

以上。