EmbulkでMySQLからBigQueryにデータ転送してみた。


はじめに

Fluentdのバッチ版Embulkを試してみました。
大容量のデータ転送で困っていた時に役立った記憶があります。

概要としては以下のような感じ。

Embulk(エンバルク)とは、プラグ可能なマルチソースバルクデータローダーです。バルク処理に特化したプラグインベースのデータローダーで、大規模データセットのバルク転送を実施します。「データベース」「DWH」「NoSQL」「ファイル形式」「クラウドデータストア」などのデータ転送を強力にサポートします。
参考:https://www.ossnews.jp/oss_info/Embulk

基本説明も以下に載せておきます。

Embulk は、ストリーミング型ログ収集フレームワーク「fluentd」のバッチ版のようなデータ転送ツールです。特に、「1発実行」「日次バッチ処理」「定期バッチ処理」などのバルク処理用途に向いています。
転送元の「ファイル」「データベース」などからデータを吸い出し、転送先の「ストレージ」「データベース」などにロードするためのシンプルな仕組みを提供します。
プラグイン型アーキテクチャを採用しており、RubyやJavaでシンプルなコードを書くことで、さまざまな「データベース」「ファイルフォーマット」「ストレージ」に対して柔軟に対応できます。
参考:https://www.ossnews.jp/oss_info/Embulk

試してみること

  • ローカル環境にEmbulkコンテナとMySQLコンテナを建てる。
  • MySQLのデータをEmbulkを使って、リモート環境のBigQueryに転送する。

リモート環境

便宜上、先に転送先のBigQueryの説明だけ書いておきます。

BigQuery

テーブルはあえて用意しないで、プロジェクトとデータセットだけ作成しておきます。
※転送先にテーブルが無い場合、embulkが自動でテーブル作成&データ投入してくれる。
※以下写真参考。

EmbulkからBigQueryに転送するための権限を持ったサービスアカウントを作成する。
※以下写真参考。

作成したサービスアカウントの秘密鍵を作成ダウンロードする。
※以下写真参考。

この秘密鍵の入ったJsonファイルをEmbulk転送時に使うので、わかるところに保存しておいてください。

ローカル環境

docker-composeを使ってEmbulkとMySQLのコンテナ環境を作成する。
以下、ローカル環境全体のディレクトリ構成です。

$ tree
.
├── docker-compose.yml
├── embulk
│   ├── Dockerfile
│   └── service_account_iam_key.json
└── mysql
    ├── Dockerfile
    ├── ddl
    │   ├── init.sql
    │   ├── insert_departments.sql
    │   └── insert_employee.sql
    └── my.cnf

以下、docker-compose.yml、Embulk、MySQLの3つの環境設定について説明をしていきます。

docker-compose.yml

環境設定は以下の通り。
強いてポイントがあるとすれば、networkを作成することでホスト名でコンテナ間通信できるようにしている所くらいです。

version: '3'

services:
  dbserver:
    build: ./mysql
    container_name: mysql_etl
    volumes:
      - ./mysql/my.cnf:/etc/mysql/my.cnf
      - ./mysql/ddl:/docker-entrypoint-initdb.d
    environment:
      MYSQL_DATABASE: training
      MYSQL_USER: fuka_user
      MYSQL_PASSWORD: fuka_pass
      MYSQL_ROOT_PASSWORD: root_pass
      MYSQL_PORT: 3306
    ports:
      - 3306:3306
    tty: true
  embulk:
    build: ./embulk
    container_name: embulk_etl
    volumes:
      - ./embulk:/home
    tty: true

networks:
  etl_networks:

Embulkコンテナ

Dockerfile

環境設定は以下の通り。

FROM openjdk:8

# パッケージ管理システムのアップデート
RUN apt-get -y update && apt-get -y upgrade

# localeの設定
RUN apt-get -y install locales && \
    localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8

# timezone (Asia/Tokyo)の設定
ENV TZ JST-9

# vim以外にも使いそうなもの一応インストール
RUN apt-get install -y vim git zip unzip less wget

# MySQLコンテナ接続確認するときのためにMySQL-clientをインストール
RUN apt-get install -y default-mysql-client

# embulkをインストール(https://www.embulk.org/)
RUN curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
RUN chmod +x ~/.embulk/bin/embulk
RUN echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
RUN . ~/.bashrc

# MySQL取得とBigQuery投入用のプラグインを入れておく
RUN ~/.embulk/bin/embulk gem install embulk-input-mysql \
&& ~/.embulk/bin/embulk gem install embulk-output-bigquery

WORKDIR /root

公開鍵の配置

BigQueryにアクセスするためのservice_accountの秘密鍵を、Dockerfileと同じ階層に配置する。
名前は「service_account_iam_key.json」としました。

├── embulk
│   ├── Dockerfile
│   └── service_account_iam_key.json

MySQLコンテナ

Dockerfile

環境設定は以下の通り。

FROM mysql:5.7

ADD ./my.cnf /etc/mysql/my.cnf
RUN chmod 644 /etc/mysql/my.cnf

ddl

以下の通り。

init.sql

DROP SCHEMA IF EXISTS training;
CREATE SCHEMA training;
USE training;

DROP TABLE IF EXISTS departments;
CREATE TABLE departments (
    department_id int primary key NOT NULL AUTO_INCREMENT,
    department_name varchar(20)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS employees;
CREATE TABLE employees (
    employee_id int primary key NOT NULL AUTO_INCREMENT,
    department_id int,
    name varchar(20),
    age int,
    CONSTRAINT fk_department_id
    FOREIGN KEY (department_id) 
    REFERENCES departments (department_id)
    ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE=INNODB DEFAULT CHARSET=utf8;

insert_departments.sql

INSERT INTO departments (department_name) VALUES ("総務");
INSERT INTO departments (department_name) VALUES ("営業");
INSERT INTO departments (department_name) VALUES ("開発");

insert_employee.sql

INSERT INTO employees (department_id, name, age) VALUES (1, "佐藤", 40);
INSERT INTO employees (department_id, name, age) VALUES (2, "加藤", 30);
INSERT INTO employees (department_id, name, age) VALUES (3, "田中", 25);
INSERT INTO employees (department_id, name, age) VALUES (3, "中村", 20);

my.cnf

特別な設定はしてません。

[mysqld]
character-set-server=utf8

[mysql]
default-character-set=utf8

[mysqld_safe]
log_error=/var/log/mysqld.log
pid_file=/var/run/mysqld/mysqld.pid

[client]
default-character-set=utf8

動作検証

Dockerコンテナの起動

以下の手順で起動しました。

$ docker-compose build --no-cache

$ docker-compose up -d

$ docker exec -it embulk_etl /bin/bash

ネットワーク接続チェック Embulkコンテナ→MySQLコンテナ

以下のように接続確認します。

# ping -c 3 172.28.0.3
PING 172.28.0.3 (172.28.0.3) 56(84) bytes of data.
64 bytes from 172.28.0.3: icmp_seq=1 ttl=64 time=2.49 ms
64 bytes from 172.28.0.3: icmp_seq=2 ttl=64 time=0.195 ms
64 bytes from 172.28.0.3: icmp_seq=3 ttl=64 time=0.129 ms

--- 172.28.0.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 15ms
rtt min/avg/max/mdev = 0.129/0.939/2.493/1.099 ms


# ping -c 3 mysql_etl
PING mysql_etl (172.28.0.3) 56(84) bytes of data.
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=1 ttl=64 time=0.327 ms
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=2 ttl=64 time=0.132 ms
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=3 ttl=64 time=0.128 ms

--- mysql_etl ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 22ms
rtt min/avg/max/mdev = 0.128/0.195/0.327/0.094 ms


# mysql -h mysql_etl -u fuka_user training -p
Enter password: 

MySQL [training]> 

ネットワーク接続チェック Embulkコンテナ→BigQuery

bqインストールします。(参考:https://cloud.google.com/sdk/docs/install#deb)

# echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list

# curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -

# apt-get update && apt-get install google-cloud-sdk

# RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg  add - && apt-get update -y && apt-get install google-cloud-sdk -y

以下のように接続確認します。

# gcloud auth login
Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=省略~

Enter verification code: 

# bq ls
表示されれば接続確認ok

embulk実行

設定ファイル準備

  • departmentsテーブル用

内容は以下の通りです。

# vim conf/embulk_guess_departments.yml

in:
  type: mysql
  user: fuka_user
  password: fuka_pass
  host: mysql_etl
  database: training
  query: select * from departments;
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /home/service_account_iam_key.json
  location: asia-northeast1
  project: training-project-314502
  dataset: test_embulk
  table: departments
  mode: append
  auto_create_table: true
  • employeesテーブル用

内容は以下の通りです。

# vim conf/embulk_guess_employees.yml

in:
  type: mysql
  user: fuka_user
  password: fuka_pass
  host: mysql_etl
  database: training
  query: select * from employees;
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /home/service_account_iam_key.json
  location: asia-northeast1
  project: training-project-314502
  dataset: test_embulk
  table: employees
  mode: append
  auto_create_table: true

embulk実行

  • departmentsテーブルの転送

実行手順は以下の通りです。

# embulk guess conf/embulk_guess_departments.yml -o conf/embulk_load_departments.yml

# embulk preview -G conf/embulk_load_departments.yml
~省略~
*************************** 1 ***************************
  department_id (  long) : 1
department_name (string) : 総務
*************************** 2 ***************************
  department_id (  long) : 2
department_name (string) : 営業
*************************** 3 ***************************
  department_id (  long) : 3
department_name (string) : 開発

# embulk run conf/embulk_load_departments.yml
  • employeesテーブルの転送

実行手順は以下の通りです。

# embulk guess conf/embulk_guess_employees.yml -o conf/embulk_load_employees.yml

# embulk preview -G conf/embulk_load_employees.yml
~省略~
*************************** 1 ***************************
  employee_id (  long) : 1
department_id (  long) : 1
         name (string) : 佐藤
          age (  long) : 40
*************************** 2 ***************************
  employee_id (  long) : 2
department_id (  long) : 2
         name (string) : 加藤
          age (  long) : 30
*************************** 3 ***************************
  employee_id (  long) : 3
department_id (  long) : 3
         name (string) : 田中
          age (  long) : 25
*************************** 4 ***************************
  employee_id (  long) : 4
department_id (  long) : 3
         name (string) : 中村
          age (  long) : 20

# embulk run conf/embulk_load_employees.yml

テーブルチェック

BigQueryを確認してみるとテーブルが作成され、データが投入されていることが確認できます!

コードレスでデータ転送できるので良い気がしてます・・!