Digdag + Embulk で production から staging へデータを同期する


Digdag と Embulk を使って Staging に Production のデータを同期した話です。
環境構築から同期が完了するまで、たくさんのエラーと格闘したので書き留めておきます。

実行時の環境は以下になります。

Embulk v0.9.22
embulk-input-mysql 0.10.1
embulk-output-mysql 0.8.7
MySQL 5.7.21

前回、 Digdag + Embulk + Fargateによるデータマスキング をしましたが、同じ Docker image を使って Staging 同期にも利用しています。

構成

管理を絶対にしたかったので Staging と Production は直接繋げず、中間の VPC を置いて中間とそれぞれがピアリングする構成にしました。

Staging には既にある程度のデータが入った状態です。

Tips

Deplicate Entry 回避

マスクしたデータを UNIQUE KEY が貼られているカラムにコピーする際、単純に embulk-filter-mask で値をマスクするだけでは Deplicate Entry エラーが発生します。 また、Staging 環境にはある程度データが入った状態なので、マスク対象でないカラムでも偶然に値が被ってしまうことがあるでしょう。

そのような場合は、 embulk-filter-ruby_proc がおすすめです。

filters:
  # before
  - type: mask
    columns:
      - { name: email, type: email } # *****@example.com とマスクされるが、UNIQUE 制約に引っかかる

  # after
  - type: ruby_proc
    columns:
      - name: email
        proc: |
          ->(email) do
            "#{Digest::MD5.hexdigest(email}@example.com" # [email protected]
          end

proc でランダムに文字列を生成すれば、Duplicate Entry エラーを回避できます。
Staging 環境では実際にメールが送信されてしまうので、@example.com の部分は自社のドメインにすると良いでしょう。

truncate_insert モードを使う

マスクしたデータに限らず Staging と Production のデータの差分から、一時的にどうしても重複してしまう場合がありました。例えば foreign_id に UNIQUE 制約が付いている場合、

# Staging
id: 1, foreign_id: 1
id: 2, foreign_id: 2

# Production
id: 1, foreign_id: 2 <- id 1 を元に foreign_id = 2 を merge したいが duplicate エラー
id: 2, foreign_id: 3

# 期待する結果
id: 1, foreign_id: 2
id: 2, foreign_id: 3

この際 merge モードを利用していましたが、truncate_insert モードを利用することで回避できました。
truncate_insert は既存のデータは削除して読み込んだデータに追記するモードです。(テーブルをドロップしない)

ステータスをアップデートする

Staging では Production 同様にバッチが走っているのですが、オブジェクトの status が完了状態ではないと更新対象になってしまいエラーが発生しました。例えば、status が完了状態でないと外部の決済サービスにリクエストが飛びますが、当然 Production のデータは存在しないためエラーとなります。

after_load を使えばクエリの実行後に指定の SQL を実行できます。
これにより、status が完了の状態でコピーすることができます。

out:
  ...
  {% if env.IS_MIDDLE == 'true' %}
  after_load: "update table set status = [完了ステータス] where status = [未完了ステータス]"
  {% endif %}

分析 DB を作成する際に同じ image を使っていることから、Staging 同期をする時だけ実行します。環境変数で実行環境を判別するフラグを受け取り、if 文で分岐をします。

外部キー制約のチェックを外す

同期時、外部キー制約に引っかかってしまう場合があります。
MySQL ではGLOBAL、SESSION 共に FOREIGN_KEY_CHECKS 変数を書き換えることで外部キー制約を外せるのですが、AWS の RDS では GLOBAL の変数を更新できないことが分かりました。

なので、before_load を使って SESSION ごとに set FOREIGN_KEY_CHECKS=0 にすることで外部キー制約のチェックを外します。

out:
  ...
  before_load: 'set FOREIGN_KEY_CHECKS=0;'

s3 の同期

本番のデータを同期するのであれば、S3にある画像データも移行する必要があります。なので、Digdag のワークフローに s3 sync するタスクも追加します。

一度、Ruby で同期するスクリプトを書いたのですが大量の assets を同期するのは思ったより時間がかかることが分かったので、素直に awscli を使うことにしました。

# Dockerfile
RUN apk --update --no-cache add python py-pip groff less mailcap curl jq && \
  pip install --upgrade awscli==1.14.5 s3cmd==2.0.1 python-magic && \
  apk -v --purge del py-pip && \

RUN if [ "$ENVIRONMENT" == "production" ]; \
  then curl -qL -o aws_credentials.json 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI && \
  aws configure set aws_access_key_id `jq -r '.AccessKeyId' aws_credentials.json` && \
  aws configure set aws_secret_access_key `jq -r '.SecretAccessKey' aws_credentials.json` && \
  aws configure set aws_session_token `jq -r '.Token' aws_credentials.json`; \
  fi
# main.dig
  ...
  +s3_sync:
    if>: ${IS_MIDDLE} # 中間 VPC で実行する時のみ S3 sync する
    _do:
      sh>: aws s3 sync s3://xxx-assets-production s3://xxx-assets-staging

s3 sync コマンドで指定する順番を間違えると、assets が吹き飛ぶので気をつけましょう。

今後の課題

設定ファイルの自動生成

Embulk は同期のための設定ファイルを手動で作成する必要があります。
手間なので input のテーブル定義を読み取って、設定ファイルを自動生成するスクリプトを書きます。

検索エンジンの reindex をワークフローに組み込む

ElasticSearch や Algolia などの検索エンジンを利用している場合、データを同期した後に reindex する必要があります。なので Digdag のワークフローに組み込む予定です。

まとめ

Digdag と Embulk を使って Staging に Production のデータを同期することについて書きました。
同じことをされる際に参考になれば幸いです。