Digdag の s3_wait> オペレータ の想定利用ケースと動作確認


オープンソースのワークフロー管理ツールDigdagに新しく s3_wait> オペレータが利用可能になっていたので、想定利用ケースと動作確認した結果について、簡単にメモします。

サービス間やチーム間でのデータ共有として、S3が利用されるケースが多い

責任範囲が異なるシステム間でのデータ共有としては、S3などのクラウドストレージやFTPなどにデータファイルを置いて、参照するという方法が多く一般的に行われています。例えば、利用しているベンダーのサービスの生ログやレポートを渡してもらったり、インフラDBAチームからマーケティングチームへ定期的にマーケット利用できるデータを渡してもらったり、など。シンプルにファイルデータを渡すことで、データ提供側とデータ受取側でお互いの運用に干渉したり、アクセスしたりしなくて済むメリットがあります。

事前のファイル存在チェックにs3_wait>オペレータを利用

データ受取側は、受け取るファイルデータをもとにいろいろとバッチ処理をすることになりますが、前提となるファイルがないと後続の処理が失敗しますので、事前処理としてファイルの存在確認をする必要があります。s3_wait>オペレータはこのファイル確認に利用できます。例えば、午前2時にファイルがS3に置いてあるはずが、データ提供側の処理が何かのトラブルでいつもより時間がかかっている場合、ファイルが置かれるまで、待機します。定期的にファイルの存在確認をし、ファイルが確認できてから、後続のワークフロー処理に進むことができます。

シナリオ - S3上のファイルを確認してから、Treasure Dataにデータをロード- 事前準備 

以下動作確認の準備になります。シナリオとしては、S3上のファイルを確認してから、Treasure Dataにデータをロードすることを想定します。
ファイルとしては以下のようなCSV形式のアクセスログファイルを想定します(サンプルデータなので日付データが古いですね)。

time,user,host,path,referer,code,agent,size,method
2014-10-03 19:59:53,,148.165.90.106,/category/electronics,/category/music,200,Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; YTB730; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C),66,GET
2014-10-03 19:59:37,,144.105.77.66,/item/electronics/3305,/category/computers,200,"Mozilla/5.0 (iPad; CPU OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3",135,GET
2014-10-03 19:59:21,,108.54.178.116,/category/electronics,/category/software,200,"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",69,GET
2014-10-03 19:59:05,,104.129.105.202,/item/electronics/4504,/item/games/394,200,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0),83,GET
2014-10-03 19:58:50,,208.48.26.63,/item/software/706,/search/?c=Software+Toys,200,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0),76,GET
2014-10-03 19:58:34,,108.78.209.95,/item/giftcards/4879,/item/toys/197,200,Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1,137,GET
2014-10-03 19:58:18,,108.198.97.206,/item/computers/4785,-,200,Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1,69,GET
2014-10-03 19:58:02,,172.195.185.46,/category/games,/category/games,200,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1,41,GET
2014-10-03 19:57:46,,88.24.72.177,/item/giftcards/4410,-,200,Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; YTB730; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C),72,GET

まずは、digdagを最新バージョンに

日々開発が進んでいるプロジェクトなので、ソフトウェアは常に最新にするようにしましょう。

$ digdag selfupdate
2016-10-03 13:44:50 +0900: Digdag v0.8.13
Checking the latest version...
Upgrading to 0.8.16...
Verifying...
Upgraded to 0.8.16

digdagプロジェクトの作成

digdag initコマンドで、import_from_s3という名のdigdagプロジェクトを作成します

$ digdag init import_from_s3
2016-10-03 14:11:46 +0900: Digdag v0.8.16
  Creating import_from_s3/import_from_s3.dig
  Creating import_from_s3/.gitignore
Done. Type `cd import_from_s3` and then `digdag run import_from_s3.dig` to run the workflow. Enjoy!

$ cd import_from_s3/
$ ls
import_from_s3.dig

.digファイルの編集

上記で生成されたサンプルのimport_from_s3.digファイルを修正します。
各タスクは定義は以下になります。

  • +check_file_on_s3: タスクで s3_wait> オペレータを利用して、S3上のyada-tokyo-bucketバケット内のwww_access_YYYYMMDDというファイル(YYYYMMDDは20161003などの日付を想定)の存在を確認する。
  • +create_table: タスクでは、td_ddl> オペレータで、インポート先となるトレジャーデータのテーブル(www_access)を作成する。
  • +load_to_td_from_s3: タスクで、S3上のファイルデータをトレジャーデータへのインポートを実施。importsディレクトリ以下にload.ymlを用意しておく必要があります。
import_from_s3.dig
timezone: Asia/Tokyo

_export:
  td:
    database: yada_demo

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${new Date().toUTCString()}

+check_file_on_s3:
  s3_wait>: yada-tokyo-bucket/www_access_${session_date_compact}

+create_table:
  td_ddl>:
  create_tables: [www_access]

+load_to_td_from_s3:
  td_load>: imports/load.yml
  database: yada_demo
  table: www_access

+teardown:
  echo>: finish ${session_time}
imports/load.yml
---
in:
  type: s3
  access_key_id: xxxx
  secret_access_key: xxxx
  bucket: yada-tokyo-bucket
  path_prefix: www_access_${session_date_compact}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ","
    quote: "\""
    escape: "\""
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"}
    - {name: user, type: string}
    - {name: host, type: string}
    - {name: path, type: string}
    - {name: referer, type: string}
    - {name: code, type: long}
    - {name: agent, type: string}
    - {name: size, type: long}
    - {name: method, type: string}
out:
  mode: append

digdagローカルモードで動作確認する際のSecrets設定

.digファイルで定義したワークフローの動作確認をローカルモードで実行します。(定義したワークフローを定期的にスケジュール実行するには、別途スケジューラサーバが必要になります。)
ローカルモードで実行するには、SecretsであるAWSのcredentialを ~/.config/digdag/config ファイルに定義しておきます。

~/.config/digdag/config
secrets.aws.access-key-id = xxx
secrets.aws.secret-access-key = xxx

以上で準備完了です。

動作確認

digdag run

$ digdag run import_from_s3.dig --rerun
2016-10-03 16:05:15 +0900: Digdag v0.8.16
2016-10-03 16:05:17 +0900 [WARN] (main): Reusing the last session time 2016-10-03T09:00:00+09:00.
2016-10-03 16:05:17 +0900 [INFO] (main): Using session /Users/ryuy/work_se/digdag/import_from_s3/.digdag/status/20161003T090000+0900.
2016-10-03 16:05:17 +0900 [INFO] (main): Starting a new session project id=1 workflow name=import_from_s3 session_time=2016-10-03T09:00:00+09:00
2016-10-03 16:05:18 +0900 [INFO] (0017@+import_from_s3+setup): echo>: start 2016-10-03T09:00:00+09:00
start 2016-10-03T09:00:00+09:00
2016-10-03 16:05:18 +0900 [INFO] (0017@+import_from_s3+disp_current_date): echo>: Mon, 03 Oct 2016 07:05:18 GMT
Mon, 03 Oct 2016 07:05:18 GMT
2016-10-03 16:05:18 +0900 [INFO] (0017@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:05:28 +0900 [INFO] (0017@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:05:40 +0900 [INFO] (0017@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:06:01 +0900 [INFO] (0017@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:06:44 +0900 [INFO] (0017@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:08:07 +0900 [INFO] (0019@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:10:49 +0900 [INFO] (0020@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:15:52 +0900 [INFO] (0021@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003

digdag runコマンドでワークフローを実行すると、S3上の yada-tokyo-bucket/www_access_20161003 を定期的にチェックしているのが確認できます。チェック間隔はだんだん伸びていきます。

ここで実際に、別ターミナルで yada-tokyo-bucket/www_access_20161003 ファイルを置いてみます。

$ aws s3 cp s3://yada-tokyo-bucket/www_access.csv s3://yada-tokyo-bucket/www_access_20161003
copy: s3://yada-tokyo-bucket/www_access.csv to s3://yada-tokyo-bucket/www_access_20161003

そうすると、次の s3_wait> によるチェックタイミングでファイルの存在が確認できワークフロー処理が進みます。
td_ddl>でwww_accessテーブルを作成し、td_load> でデータをインポートしています。

2016-10-03 16:20:54 +0900 [INFO] (0022@+import_from_s3+check_file_on_s3): s3_wait>: yada-tokyo-bucket/www_access_20161003
2016-10-03 16:20:55 +0900 [INFO] (0022@+import_from_s3+create_table): td_ddl>: null
2016-10-03 16:20:55 +0900 [INFO] (0022@+import_from_s3+create_table): td-client version: 0.7.26
2016-10-03 16:20:55 +0900 [INFO] (0022@+import_from_s3+create_table): Logging initialized @940060ms
2016-10-03 16:20:55 +0900 [INFO] (0022@+import_from_s3+create_table): Creating TD table yada_demo.www_access
2016-10-03 16:20:56 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:20:57 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:20:58 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): Started bulk load job id=93586036
2016-10-03 16:21:00 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:21:02 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:21:06 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:21:13 +0900 [INFO] (0022@+import_from_s3+load_to_td_from_s3): td_load>: imports/load.yml
2016-10-03 16:21:14 +0900 [INFO] (0022@+import_from_s3+teardown): echo>: finish 2016-10-03T09:00:00+09:00
finish 2016-10-03T09:00:00+09:00
Success. Task state is saved at /Users/ryuy/work_se/digdag/import_from_s3/.digdag/status/20161003T090000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

最後にトレジャーデータでデータ確認。

$ td query -Tpresto -w -d yada_demo 'select * from www_access limit 10'

+------+-----------------+------------------------+--------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------+------------+
| user | host            | path                   | referer                  | code | agent                                                                                                                                                                     | size | method | time       |
+------+-----------------+------------------------+--------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------+------------+
| null | 148.165.90.106  | /category/electronics  | /category/music          | 200  | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; YTB730; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C) | 66   | GET    | 1412366393 |
| null | 144.105.77.66   | /item/electronics/3305 | /category/computers      | 200  | Mozilla/5.0 (iPad; CPU OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3                                           | 135  | GET    | 1412366377 |
| null | 108.54.178.116  | /category/electronics  | /category/software       | 200  | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11                                                               | 69   | GET    | 1412366361 |
| null | 104.129.105.202 | /item/electronics/4504 | /item/games/394          | 200  | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)                                                                                                           | 83   | GET    | 1412366345 |
| null | 208.48.26.63    | /item/software/706     | /search/?c=Software+Toys | 200  | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)                                                                                                    | 76   | GET    | 1412366330 |
| null | 108.78.209.95   | /item/giftcards/4879   | /item/toys/197           | 200  | Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1                                                                                                     | 137  | GET    | 1412366314 |
| null | 108.198.97.206  | /item/computers/4785   | -                        | 200  | Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1                                                                                                     | 69   | GET    | 1412366298 |
| null | 172.195.185.46  | /category/games        | /category/games          | 200  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1                                                                                       | 41   | GET    | 1412366282 |
| null | 88.24.72.177    | /item/giftcards/4410   | -                        | 200  | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; YTB730; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C) | 72   | GET    | 1412366266 |
| null | 24.129.141.79   | /category/electronics  | /category/networking     | 200  | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)                                                                                                           | 73   | GET    | 1412366250 |
+------+-----------------+------------------------+--------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------+------------+

まとめ

Digdagの s3_wait> オペレータについて紹介しました。
シンプルな機能ですが、ファイルチェックを自分で実装すると、面倒かと思いますので便利に使えるかと思います。