TreasureData sftpサーバのCSVファイルをデイリーでワークフローでテーブルにインポート


TreasureDataのテーブルにsftpサーバのCSVをワークフローでインポートする方法のメモ

【やりたいこと】
sftpサーバに毎日送られてくる売上CSVを毎日特定のTreasureDataのテーブルにインポート(テーブルは毎日上書きされその日のCSVデータだけが格納)する。

【売上CSVの前提】
売上CSVのファイル名には年月日時分秒が付いてくる。
2019年11月14日の売上CSVの場合
「Sales20191114235959.csv」
となる。

【ワークフロー作成手順】
①とりあえず空のワークフローを作成
・Workflowsの「New Workflow」ボタンを押す
・"Create Workflow"の入力項目を入力する

CreateWorkflow
Workflow Name:ワークフロー名
Project Name:ワークフロー名と同じ
Workflow TemplateBlank

・もう一度「New Workflow」を押すと空のワークフローが作成される

②ワークフローの定義(dig)を作成
・①で作成したワークフローを選択
・"Workflow Definition"を選択
・[Edit Workflow]アイコンを押す
・digファイルに以下の内容を記述(毎日8:10に処理を実行する場合)

workflow.dig
timezone: "Asia/Tokyo"

schedule:
 daily>: 08:10:00

_export:
  td:
    _db: db_name
    _table: table_name

+load_step:
  td_load>: yaml/import_csv.yml
  database: ${td._db}
  table: ${td._table}

<注意点>
>: の後にスペースが無いとエラーになる
>事前にインポート先のテーブルを作っておかないとインポートされない

③インポートの定義(yml)を作成
・[Add Project File]アイコンを押す
・ファイル名に"yaml/import_csv.yml"を入力
・"yaml/import_csv.yml"に以下の内容を記述

yaml/import_csv.yml
---
in:
  type: sftp
  host: sftpサーバのipアドレス
  port: 22
  user: sftpLoginID
  password: password
  user_directory_is_root: false
  path_prefix: "/csv_path/Sales"
  path_match_pattern: ${moment(session_time).format("YYYYMMDD")}.*csv
  parser:
    charset: MS932
    newline: CRLF
    type: csv
    delimiter: ","
    quote: "\""
    escape: "\""
    trim_if_not_quoted: false
    skip_header_lines: 0
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: description, type: string}
out: {mode: replace}
exec: {}
filters:
- from_value: {mode: upload_time}
  to_column: {name: time}
  type: add_time

<注意点>
>: のあとにスペースを入れないとエラーになる
>charsetはモビルスーツの型番と間違えてしまいそうな謎の"MS932"、日本語のデータがあるからと"SHIFT-JIS"としてしまうと文字化けする
>columnsはデータに合わせて変えてください
・「Save & Commit」ボタンを押して内容を保存