直近でTreasure Workflowではまったこと


bqオペレーターがUSリージョン以外は使えない

digdagのissueとしてはあがっているが。2019年の4月にあがったままなので対応されないのかもしれない...
issue内にshオペレーターを使ってbqコマンドを叩くワークアラウンドの記載があるが、Treasure workflowでは使えないので回避策なし。

TreasureData → BQのエクスポートでUS以外の場合はlocationの指定が必要

US以外の場合、以下のような形でlocationの指定が必要

sample.dig
+td2bq:
  td>: queries/xxx.sql
  result_connection: (connection_name)
  result_settings:
    project: gcp_project_name
    dataset: dataset_name
    table: tablename
    location: asia-northeast1
    mode: append
    max_bad_records: 0
    ignore_unknown_values: true
    allow_quoted_newlines: true

TDからBQへエクスポートする際の result_settings で設定可能なオプションは下記リンク。ただし、 location が書いていない...
https://github.com/treasure-data/treasure-boxes/tree/master/td/bigquery#supplemental

BQエクスポート時にmode:replaceの場合にauto_create_tableとschema_fileが必要

以下のようにreplaceを使う場合、auto_create_tableをtrueにしてschema_fileも設定する必要がある

+td2bq:
  td>: queries/xxx.sql
  result_connection: (connection_name)
  result_settings:
    project: gcp_project_name
    dataset: dataset_name
    table: tablename
    location: asia-northeast1
    mode: replace
    auto_create_table: true # appendでは必要はないがreplaceの場合は必要
    schema_file: '[{"name":....}]' # appendでは必要はないがreplaceの場合は必要
    max_bad_records: 0
    ignore_unknown_values: true
    allow_quoted_newlines: true

変数を入れ子で使えない

やりたかったことは、td_for_eachオペレーターを使って取得した値(以下サンプルでいうとaccountid)をsubタスクに変数として渡して、subタスク側で利用する。

main.dig
+for_each:
  td_for_each>: queries/xxxx.sql
  engine: presto
  _do:
    +task1:
      _export:
        accountid: ${td.each.accountid}
      +task2:
        call>: sub.dig
sub.dig
td>: queries/xxxx.sql
result_connection: xxxxx
result_settings:
  xxxxx: ${accountid}

sub.dig内の xxxxx: ${accountid} が変数の入れ子になるので値を渡すことができない。
取得した値を利用するには、xxxxx: ${td.each.accountid} とするしかない。

本当はsub.digを切り出すことで、別のところからも再利用したかったのに。

参考