Treasure Workflowを使ってみた


概要

TreasureDataで、定期的にSELECTしたデータをテーブルに流し込みたかったので、Workflowを使ってみたので、作業メモを残しておく。

ApiKeyについて

td wf secretsコマンドなるものがあり、Apikeyを安全に扱えるとのことなので使ってみる。

ApiKeyセット

マスターキーをセットする。
※ Write-only keyは、selectやimportに使えないのでNG

ターミナル
$ td wf secrets --project hoge-pjt --set hoge.apikey

デフォルトのApiKeyを変更したい場合は、下記コマンドになる

ターミナル
$ td wf secrets --project hoge-pjt --set td.apikey

ApiKey削除

登録済みのApiKeyを削除する。

ターミナル
$ td wf secrets --project hoge-pjt --delete hoge.apikey

ApiKey確認

登録済みのApiKeyの一覧を確認する。

ターミナル
$ td wf secrets --project hoge-pjt

Treasure Workflow(準備)

digファイル作成

./hoge-pjt.dig

schedule:
  daily>: 06:00:00

_export:
  td:
    database: hoge_workflow_tmp

+update_hoge:
  _secrets:
    td:
      apikey: hoge.apikey
  td>: sql/test.sql
  engine: hive
  create_table: test

クエリ作成

実行するクエリを作成します。

./sql/test.sql
WITH tmp AS (
    SELECT
      td_global_id 
    FROM
      fuga.access_log
    WHERE
      TD_TIME_RANGE(time, td_time_add(TD_SCHEDULED_TIME(), '-1d'), TD_SCHEDULED_TIME())
      AND td_global_id IS NOT NULL
  ) AS t1
)
-- DIGDAG_INSERT_LINE
SELECT * FROM tmp

Treasure Workflow(TD)

TreasureDataへWorkflowを登録して実行します。

Workflow登録

pushコマンドでTreasureWorkflowへ登録(更新)する。

ターミナル
$ td wf push hoge-pjt

Workflow実行

Workflowを即時に実行してみる。

ターミナル
$ td wf start hoge-pjt hoge-pjt --session now

Workflow削除

作成したWorkflowを削除する。

ターミナル
$ td wf delete hoge-pjt

Workflow確認

登録済みのWorkflow一覧を確認する。

ターミナル
$ td wf workflows

Treasure Workflow(ローカル)

作成したWorkflowをローカルで実行してみたい場合は、以下のコマンドでできるみたい。

ローカル実行

ターミナル
$ td wf run hoge-pjt

ローカル再実行

再度実行する場合は、「--return」を付ける

ターミナル
$ td wf run hoge-pjt --return

以上

参考サイト