digdagでTreasureDataへのデータを取り込むスケジュールワークフローを作成する


TDへのデータ転送処理をDataConnectorのスケジューラからdigdagのワークフローに置き換えました。

ログ転送などのデータフロー、ETL処理は各社色々な事情があると思うので役に立つかは不明ですが、
digdagの記事はまだあまりないので少しでも参考になれば幸いです。

弊社の事例では日々S3に格納されるアクセスログをTreasureDataに転送しています。
今まではTDがホスティングしているEmbulkサービスのDataConnectorのスケジューリング機能を使って自動化していましたが、
いざ運用をしてみると色々困ることがあったのでDigDagのTDオペレータを使って置き換えました。

※DataConnector同様DigDagもワークフローサービスという名前でTDがホスティングされていますが、
現在クローズドベータ版中のため自前で構築しました。

DataConnector

簡単で爆速なのでTDでバルク転送していたら使わない理由がないDataConnectorです。

td connector:create daily_import "10 0 * * *"   td_sample_db td_sample_table load.yml

embulkの設定ファイル(load.yml)を作ってcron 記法でコマンドを叩くと
スケジュール登録でき、手軽に始められて非常に便利。
ただ、運用するにあたりちょっとだけ注意が必要です。

自分の場合は下記のようにS3のバケット上に日別にディレクトリを分けてログを格納しています。

bucket/acceslog/yyyymm/yyyymmdd/01.log
bucket/acceslog/yyyymm/yyyymmdd/02.log
...

embulkの設定ファイルはこんな感じ(スキーマ部分は割愛)

yml
in:
  type: s3
  access_key_id: xxxxxxx
  secret_access_key: yyyyyyyy
  bucket: zzzzzzz
  path_prefix: acceslog/
  decoders:
    - type: gzip
    //以下略

DataConnectorは最後に読み込んだファイルのパスを覚えており、
再度実行した際には辞書順で次のファイルから取り込むという仕様になっています。
なので、一度登録すれば毎日自動でデータインポートできるのでお手軽です。

ただ、スケジュール登録した時間にS3にファイルが置かれていなかったり、
中途半端な状態で終わっていたりとリカバリが必要な状況が多々発生しました。
しかも、S3にアップロードする処理は別の部の管轄だったりして
気がつけばデータが歯抜けに、ということがありました。

DataConnectorはスケジューラ単体ではあまり使わない?

最初は気づきしたい手動でload.ymlを直してtd connector:issue load.ymlしたり
Webコンソール上でINSERT INTOしてたのですが、対象データが増えてやってられなくなりました。
主に下記のような理由からワークフローの導入を検討しました。(遅すぎ?)

  1. 再実行時の冪等性が保たれない

  2. TD外の前処理と連携出来ない(終わっていない、または途中で失敗していたなど)

  3. データインポートに続けてスケジュールクエリも実行したい

  4. Webコンソール上ではデータの削除(DELETE文の発行)が出来ない

ツールの候補としてはjenkins、luigi、azukabanなども検討しましたが最近話題のDigDagに決めました。
TD使っているのであればDigdag一択で良いと思います。(クローズベータが案内されていればホスティング版。)

TDオペレータ使えばシェルからTDCLI使う必要もないすし、設定のテンプレート化が出来ます。

Digdagを使ったTDへのデータ取り込みフロー

どんなふうにワークフローを記述したのか簡単に紹介します。

まず、ログデータの流れですが、下記のようになってます。

Webサーバ(N台) → 集約サーバ(1台) → |匿名化や加工など| → ストレージ(S3) → TD(一時テーブル)→ 各種テーブル

ログのローテションが終わった深夜に転送処理を行ってます。
オンプレの各種サーバから一度集約し、データのマスクや加工を行いS3へアップロードします。
この辺りが様々な理由から失敗・遅延することがあります。
S3からTDへはDataConnectorを使って取り込み(スケジューラ)
一時テーブルから各種テーブルへのTransformもスケジュールクエリで行っていました。

ログの格納場所

/bucket/accesslog/201608/20160801/01.log
/bucket/accesslog/201608/20160801/02.log
/bucket/accesslog/201608/20160802/01.log
/bucket/accesslog/201608/20160802/02.log

ログは上記の様に
/年月/年月日/ファイル
というようなパス構成でS3に格納しています。

DataConnector(embulk)はファイルのprefixで読み取り範囲を決められるため
日付ごとにディレクトリを分けておいたほうが都合が良いです。
また、いきなりYYYYMMDDにするのではなく、YYYYMMを挟むことで見やすくなり、一括再実行も月単位で出来ます。

1つのワークフロー

問題であったデータの冪等性を担保するために、下記のようなジョブに作り変えました。

  1. TD上に1日分の一時テーブルを作成(DROP-CREATE)

  2. 一日分のデータを一時テーブルに取り込み

  3. 取り込み先テーブルのデータから対象日付1日分のレコードを削除

  4. 一時テーブルからSELECT(加工SQL)〜 INSERT INTO で取り込み

  5. 一時テーブルを削除

TDではテーブルのレコード削除(DELETE文の発行)は時間単位でしか出来ません。
あと、WEBのコンソールからも出来ません。TD CLIのコマンドでしか消せません。

td table:partial_delete example_db table1 --from '2014-01-01 00:00:00 JST' --to '2015-01-01 00:00:00 JST'

このコマンドもdigdagのtd operatorにあります。

digdag serverのディレクトリ構成

/project/workflow
/project/config/template.yml (DataConnectorのテンプレ)
/project/queries/template.sql (加工SQLのテンプレ)
/project/tasks/

ワークフローのファイルはこんな感じです。
かなりゴリったのでもっと良い書き方があれば教えてください。

sample.dig
timezone: "Asia/Tokyo"

#schedule:
  daily>: 03:00:00

_export:
  td:
    apikey: zzzz/zzzzzzzz
  database: test_db
  #セッション日を対象とする場合
  FROM: ${moment(session_time).format("YYYY-MM-dd")}
  YYYYMMDD: ${session_date_compact}
  YYYYMM: ${session_date_compact.substring(0,6)
    #1日分を範囲指定するために日に丸める。
    #JSTに変換するため、unixtimeを使用
  #該当日のデータを消してから入れることで冪等性を担保

+prepare:
_type: td_ddl
  empty_tables: [tmp_table_${YYYYMMDD}]

+load_data:
  td_load>: config/template.yml
  database: digdag_test
  table: tmp_table

+delete_data:
  td_partial_delete>: target_table_${YYYYMMDD}
  database: test_db
  from: ${FROM}
  to: ${TO}

+transform_data:
  td>: queries/template.sql
  insert_into: mytable
  database: digdag_test
  table: target_table

+post:
  _type: td_ddl
  drop_tables: [tmp_table_${YYYYMMDD}]

# 終わったらslackで通知
#+notify:


工夫点としては一時テーブルを日毎に作るようにして、同じジョブ(別セッション日付)を複数同時に走らせられるようにしました。

Digdagはワークフロー(digファイル)だけでなく、読み込むSQLやload.ymlにもパラメータが渡せます。
${variable}という記法で環境変数を参照することが出来ます。
さらに${}の中ではJavaScriptが使えるため柔軟にパラメータを操作することが可能です。

これを使って冪等性を保つためにデータを範囲指定して消すために時間の計算をしています。

test.sh
 digdag run test.dig -E --dryrun --rerun -p YYYY=test

上記コマンドでパラメータを確認しながら実装しました。
※dryrunでも --rerunオプションを指定しないと終わったタスクはスキップされます。

動作確認できたものはdigdag pushでサーバに登録して毎日スケジュール実行されるようになりました。

手動でやっていた時は、誤って同じデータを2重に取り込んでしまうこともありました。
リカバリしようとして間違ったデータを消したりも・・・
人手の運用はミスもあるし、なるべく自動化したいですね。

digdag server のセットアップには色々手間取ったのでそれについてもまた記録しようと思います。