3rep - Embulk やってみた(語彙力


最近データ基盤整理をやり始め、便利なものがあったのでまとめたメモ
色々他の人と被っているかもですが雑多なめも

Embulkとは

  • ETLツール
  • リアルタイムで動作するストリーミング型ログ収集フレームワーク
  • fluentdのバッチ版
  • TreasureData製のOSS
  • 大量のデータをバッチ転送することに特化
  • データの入力・加工・出力する部分はプラグインとして提供
  • 設定ファイルはYAML

主な機能


- ファイル/データベース 吸い出し
- ストレージ/データベース ロード

メリット

  • 並列実行でパフォーマンス最適化
  • validation
  • エラーリカバリー
  • 冪等なリトライ
  • プラグインが豊富なので大抵のユースケースをカバー(ここ大事

Embulkの型

説明
boolean true / false
long 64bit符号付き整数型
timestamp 日付。ナノ秒を含む
double 64bit浮動小数点型
string 文字列
  • 途中型が一致しなかったデータに関してはその行だけ消えた

Embulk Plugin

基本的な使い方(Postgresql出力)

Embulkが設定されていることを前提で記載しています。
ただいまdigdag & EmbulkのDockerを作成中です

  • guess.yml - Postgresqlからselectし結果表示設定ファイル ※1
in:
  type: postgresql
  host: {{ host }}
  user: {{ user }}
  password: {{password }}
  database: {{ database }}
  query: |
    SELECT 
      * 
    FROM 
      {table} 
    order by id asc;
out:
  type: stdout
  • guessコマンドで最小限の設定を保管したファイル設定を生成します。
$ embulk guess ./guess.yml -o config.yml
2020-01-29 07:15:24.615 +0000: Embulk v0.9.22
2020-01-29 07:15:25.898 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:15:30.659 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:15:33.179 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:15:33.578 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-postgresql (0.10.1)
in:
  type: postgresql
  host: postgres-db
  user: root
  password: root
  database: database
  query: "SELECT \n  * \nFROM \n  table \norder by id asc;\n"
out: {type: stdout}

Created 'config.yml' file.
  • previewコマンドでdry-runができます
$ embulk preview config.yml
2020-01-29 07:22:01.738 +0000: Embulk v0.9.22
2020-01-29 07:22:02.691 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:22:06.590 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:22:07.543 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:22:07.738 +0000 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:22:07.798 +0000 [INFO] (0001:preview): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:22:07.807 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:07.955 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:07.969 +0000 [INFO] (0001:preview): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:22:08.153 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:08.167 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:08.175 +0000 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
  *
FROM
  {Table}
order by id asc;

2020-01-29 07:22:08.180 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.184 +0000 [INFO] (0001:preview): > 0.00 seconds
2020-01-29 07:22:08.188 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.189 +0000 [INFO] (0001:preview): > 0.00 seconds
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
| id:long | text:string | text_code:long | main_text:string | create_at:long | update_at:long | status:string |
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
テーブルが表示される
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
  • runで実行 stdoutにoutput 指定
$ embulk run config.yml
2020-01-29 07:26:44.438 +0000: Embulk v0.9.22
2020-01-29 07:26:45.808 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:26:50.567 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:26:51.804 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:26:52.027 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:26:52.106 +0000 [INFO] (0001:transaction): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:26:52.119 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.226 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2020-01-29 07:26:52.250 +0000 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:26:52.335 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / tasks=1
2020-01-29 07:26:52.348 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-01-29 07:26:52.591 +0000 [INFO] (0015:task-0000): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.600 +0000 [INFO] (0015:task-0000): SQL: SET search_path TO "public"
2020-01-29 07:26:52.608 +0000 [INFO] (0015:task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
  *
FROM
  table
order by id asc;

2020-01-29 07:26:52.611 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.615 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2020-01-29 07:26:52.620 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.621 +0000 [INFO] (0015:task-0000): > 0.00 seconds
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
2020-01-29 07:26:52.628 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2020-01-29 07:26:52.633 +0000 [INFO] (main): Committed.
2020-01-29 07:26:52.633 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}

guess.ymlファイルに環境変数を設定しファイルを生成する

ファイル拡張子に.liquidとつける「postgres-guess.yml.liquid」

  • liquid〜Embulkに備わっているテンプレートエンジン
  • 同じ設定を記載し続けて肥大化を避けるため

_env.yml.liquid

  • env設定ファイル
_env.yml.liquid
{% assign env_postgresql = 'postgresql' %}
{% assign env_host = '{host_name}' %}
{% assign env_user = '{user_name}' %}
{% assign env_password = '"{user_password}"' %}
{% assign env_database = '{database_name}' %}

guess.yml.liquid

  • 汎用設定が書かれたテンプレートファイル
    • 今回はpostgresなのでpostgres-guess.yml.liquidとしてます
postgres-guess.yml.liquid
{% include 'env' %}                          # これ必須
exec:
  min_output_tasks: 1                       # OutPutタスク数制御(1にしないとOutPutファイルが複数分割される)
in:
  type: {{ env_postgresql }}
  host: {{ env_host }}
  user: {{ env_user }}
  password: {{ env_password }}
  database: {{ env_database }}
  query: |
    SELECT 
      * 
    FROM 
      Table Name
    order by id asc;
out:
  type: stdout

実行ファイル生成

$ embulk guess ./postgres-guess.yml.liquid -o postgres-config.yml

dry-run

$ embulk preview postgres-config.yml

実行

$ embulk run postgres-config.yml

感想

  • 便利すぎて鼻血が出る
  • 基本的な処理などはembulk側がやってくれるのでどこにどうデータを集めてデータ分析を行なっていく化などの考える時間に割り当てられます
  • プラグインによっては差分で取得してくれるものもあるのでupdateの項目があるデータに関しては積極的に使ってみましょう
  • 次はdigdagについて書いてみようかと思います。

※1 guess 読み込んだデータファイルのレイアウトを推測してyamlファイルを生成する機能