Digdag + Embulk + Fargateによるデータマスキング


今回は Digdag と Embulk を用いて分析基盤を作ったことについて書きます。
意外とハマりポイントが多く、ネット上に知見もそこまで多くなかったため Tips や解決策を書きます。

実行時の環境は以下になります。

Embulk v0.9.22
embulk-input-mysql 0.10.1
embulk-output-mysql 0.8.7
MySQL 5.7.21

背景

社内では Redash を用いて様々な部署の方がデータの分析をしています。

Redash からアクセス可能なデータの中には秘匿情報も含まれるため、権限管理で分析基盤へのアクセスを厳しく制限する形をとっていました。そこで、社内でより分析をしやすい状態を目指し、秘匿情報をマスキングすることで社員なら誰でもアクセスして問題ない状態を作ることにしました。

このような仕組みを作ると何かしらの情報漏洩が発生するリスクを限りなく小さくできるので、サイトのみならず、会社の信頼性に繋がります。やったね。

今回はワークフローエンジンの Digdag とバルクデータローダーの Embulk を使いました。
それぞれどういうものかはネット上に記事が多いのでここでは割愛します。

説明はこちらの記事が分かりやすいです。

構成

CloudWatchEvents から ECS (Fargate) のタスクを定期実行するようにしています。
こうすると、バッチの実行時のみ課金されるため利用料金を節約できます。

Embulk の INPUT には読み込み権限のみを持った DB、 OUTPUT には新規に作成した分析用の DB を指定します。

Tips

この構成を作る上で乗り越えることがいくつかありました。
細かい話ですが、同じ状況に遭遇した際の解決に役立ちそうなので書いていきます。

コンテナ内の環境変数を Embulk で使いたい

production 環境では Docker コンテナ内に定義した環境変数を使いたいです。
以下は調べると良く出てくるサンプルです。

main.dig
timezone: UTC

_export:
  INPUT_DATABASE_HOST: ${database.input_host}
  INPUT_DATABASE_USER: ${database.input_user}
  INPUT_DATABASE_PASSWORD: ${database.input_password}
  INPUT_DATABASE_NAME: ${database.input_name}
  OUTPUT_DATABASE_HOST: ${database.output_host}
  OUTPUT_DATABASE_USER: ${database.output_user}
  OUTPUT_DATABASE_PASSWORD: ${database.output_password}
  OUTPUT_DATABASE_NAME: ${database.output_name}
config/development.yml
database:
  input_host: docker.for.mac.localhost
  input_user: root
  input_password: ""
  input_name: development
  output_host: docker.for.mac.localhost
  output_user: root
  output_password: ""
  output_name: development_copy
# 起動
digdag run main.dig -P config/development.yml

良くあるサンプルなのですが、production の時はどうするのかという問題に当たります。
config/production.yml に直接 DB 情報をベタがきする訳にもいかないので、この方法は使えません。

そこで、Embulk のタスクファイル内で直接環境変数を読むようにしました。

tasks/hoge_task.yml.liquid
in:
  type: mysql
  host: {{env.INPUT_DATABASE_HOST}}
  user: {{env.INPUT_DATABASE_USER}}
  {% if env.INPUT_DATABASE_PASSWORD != '' %}
  password: {{env.INPUT_DATABASE_PASSWORD}}
  {% endif %}
  database: {{env.INPUT_DATABASE_NAME}}
  table: sample
  select: "*"

out:
  type: mysql
  host: {{env.OUTPUT_DATABASE_HOST}}
  port: {{env.OUTPUT_DATABASE_PORT}}
  user: {{env.OUTPUT_DATABASE_USER}}
  {% if env.OUTPUT_DATABASE_PASSWORD != '' %}
  password: {{env.OUTPUT_DATABASE_PASSWORD}}
  {% endif %}
  database: {{env.OUTPUT_DATABASE_NAME}}
  table: sample
  select: "*"
  mode: merge

このようにして書くと Embulk が Docker コンテナ内の環境変数を使ってくれます。

プラグインが対応していないカラムの型があった

embulk のプラグインで embulk-output-mysql を使っていました。
ログは CloudWatch に流れていくのですが、ログを見ると以下のようなエラーが出ていました。

Data truncation: Data too long for column 'body' at row 21

とあるテーブルの body 21行目のカラムを見ても値は NULL ... おかしいなと思い、再度 README を読んでみると Supported types の項目が。

ここでエラーが出ていた body のカラムの型を見てみると mediumtext 型でした。
単純にサポートされてなかったようです。PR 投げて直せたら良いですが、一旦の退避策としては以下のようになります。

out:
  ...
  column_options:
    body: { type: TEXT }
    is_sample: { type: TINYINT }
  mode: merge

filters:
  - type: mask
    columns:
      - { name: body, type: substring, length: 65535 }

サポートされている TEXT 型で書き出すように設定します。また、embulk-filter-mask というプラグインを使っていたため、合わせて最大文字サイズも調整します。

boolean 型のサポートも今後対応との事で、TYNYINT で指定します。

mode: merge でマージされない

mode: merge は既存のデータがあったら上書き、無ければ新規追加する挙動と認識していましたが、私の環境では上書きがされず、実行する度にレコードが増えていく挙動に遭遇しました。

原因は、OUTPUT の DB を0の状態から作ろうとしていたのですが、初回の実行時に、上書きに必要な情報である Primary key が設定されていなかったからでした。

Embulk の input, output は別プラグインで情報の共有はできないため、出力先は事前に Primary key 属性をつけてテーブルを作っておくか、column_options で primary keyを指定する必要がありました。

out:
  type: mysql
  column_options:
    id: { type: 'bigint primary key' }

初回の実行で primary key が反映されていることを確認後、2回目の実行からはちゃんと merge されました。

index が貼られない

Embulk はデフォルトで index を貼ってくれません。
create_table_constraint を使うと、テーブルの作成時に index を貼ることができます。

out:
  ...
  create_table_constraint: '
    KEY `index_users_on_user_column_1` (`user_column_1`),
    KEY `index_users_on_user_column_2` (`user_column_2`),
  '

DB のタイムアウト

Communications link failure
The last packet successfully received from the server was 114,983 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.

CloudWatch のログをみると、上記エラーが発生していました。
調べると MySQL の timeout が関係しているようなので調査をし、結果的には wait_timeout が原因だったので値を増やして解決しました。

参考: https://qiita.com/uchiko/items/b27537c0a100b6537a4a

もし数値がデフォルトの socketTimeout (1800000) に近しい場合、
options で socketTimeout を伸ばすことで解決する場合もあります。

out:
  type: mysql
  options: { socketTimeout: 7200000  }

RDS のメモリ不足

# CloudWatch のエラーログ
No operations allowed after statement closed
# RDS のエラーログ

vailable memory is low.
...
<jemalloc>: Error in mmap(): err: 12, msg: Cannot allocate memory
<jemalloc>: Error in malloc(): out of memory

その次に CloudWatch に別のエラーが出ており原因がいまいち掴めないなと困っていた所、RDS のエラーログを見ると明確に OOME が出ていました。インスタンスタイプを上げて回避しましたが、バッチサイズを指定するなど、調整どころになります。

Slack 通知

タスクが終了した後の結果を通知したいときは、便利なライブラリがあったので紹介します。

このサンプルで ENVIRONMENT の値を動的に取りたかったのですが、調べる限り dig ファイルの中で直接環境を取得する方法は見当たらず、以下のような追記が必要そうです。

main.dig
_export:
  ..

+step1:
  rb>: SetEnv.run
  require: 'config/set_env'

+load:
  .. 
config/set_env.rb
class SetEnv
  def run
    Digdag.env.store(ENVIRONMENT: ENV['ENVIRONMENT'])
  end
end

もちろん Dockerfile にも ruby をインストールする記述が必要です。

RUN apk --update --no-cache add ... ruby ruby-bundler ruby-json && \
...

通知結果

時間がずれる

embulk-input-jdbcのMySQLプラグインで9時間時間がずれる
https://qiita.com/katsuyan/items/f22dc5a86522ba3c5652

その他

その他やったこととして Terraform でこの構成を作る事や、Redash と DB の間に HAProxy がいたので、一台の HAProxy でマスク前/後のDBに正しく Redash からリクエストを割振れるようにしたりと、やる事がたくさんありました。

(おまけ) Digdag 全体のコード

main.dig
timezone: UTC

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.4
  webhook_url: xxx
  workflow_name: DB Masking

+step1:
  rb>: SetEnv.run
  require: 'config/set_env'

+load:
  _parallel: true

  +load1:
    for_each>:
      table: [
        hoge,
        ...
      ]
    _do:
      sh>: embulk run ./tasks/${table}.yml.liquid

    _check:
      slack>: notify_templates/success.yml

    _error:
      slack>: notify_templates/failed.yml

  +load2:
    for_each>:
      table: [
        huga,
        ...
      ]
    _do:
      sh>: embulk run ./tasks/${table}.yml.liquid

    _check:
      slack>: notify_templates/success.yml

    _error:
      slack>: notify_templates/failed.yml

まとめ

今回は didag と embulk を使った分析基盤の作成(データのマスキング処理)と Redash からアクセス可能にするまでを行いました。このようなところでも技術を用いて広義の信頼性を高められるので、面白いなと思います。

Twitter のフォローもどうぞよろしくです! (@hassasa3)