【Embulk】Embulkを使用してMySQLからElasticsearchへデータ転送する


はじめに

前回、MacにEmbulkコマンドのインストールまでをしました。
今回は、Embulkを使用してMySQLからElasticsearchへのデータ転送をしてみます。

環境構築

Mac上にDocker環境を構築しておきます。
- MySQL:5.7
- Elasticsearch:7.9.0
- Kibana:7.9.0

docker-compose

参考までにサンプルを用意しました。

docker-compose.yml
version: '3.1'

services:
  # MySQL
  db:
    image: mysql:5.7
    container_name: my-example-mysql57
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_ALLOW_EMPTY_PASSWORD: 1
      TZ: "UTC"
    volumes:
      - ./mysql/init:/docker-entrypoint-initdb.d
      - ./mysql/my.cnf:/etc/mysql/conf.d/my.cnf
      - ./docker/mysql/data:/var/lib/mysql
      - ./docker/mysql/log:/var/log/mysql
    ports:
      - 3357:3306
  # Elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.9.0
    container_name: my-example-elasticsearch79
    volumes:
      - ./docker/elasticsearch/data:/usr/share/elasticsearch/data
    ports:
      - 9279:9200
    expose:
      - 9300
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - http.host=0.0.0.0
      - transport.host=127.0.0.1
  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:7.9.0
    container_name: my-example-kibana79
    ports:
      - 5679:5601
volumes:
  my-example-elasticsearch79-data:
    driver: local

転送元データの準備

検証のため簡単なテーブルとデータを用意しました。

mysql/init/create_data.sql
CREATE DATABASE myembulk DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
grant select,update,insert,delete on myembulk.* to root@localhost identified by 'root';
flush privileges;

use myembulk;
CREATE TABLE myblog (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
title varchar(255) NOT NULL,
description text,
status tinyint(3) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO myblog VALUES (1,'ブログタイトル1','ブログ本文1',1);
INSERT INTO myblog VALUES (2,'ブログタイトル2','ブログ本文2',1);
INSERT INTO myblog VALUES (3,'ブログタイトル3','ブログ本文3',1);
INSERT INTO myblog VALUES (4,'ブログタイトル4','ブログ本文4',1);
INSERT INTO myblog VALUES (5,'ブログタイトル5','ブログ本文5',1);
INSERT INTO myblog VALUES (6,'ブログタイトル6(削除済)','ブログ本文6',0);

Dockerコンテナを起動

こんな感じでできていれば、成功です。

$ docker-compose up -d

$ docker-compose ps
           Name                         Command               State                 Ports              
-------------------------------------------------------------------------------------------------------
my-example-elasticsearch79   /tini -- /usr/local/bin/do ...   Up      0.0.0.0:9279->9200/tcp, 9300/tcp 
my-example-kibana79          /usr/local/bin/dumb-init - ...   Up      0.0.0.0:5679->5601/tcp           
my-example-mysql57           docker-entrypoint.sh mysqld      Up      0.0.0.0:3357->3306/tcp, 33060/tcp

これで事前に必要な準備は完了です。次は本題のEmbulkです。

Embulk

プラグインのインストール

データ転送元はMySQL、転送先がElasticsearchになるので、プラグインを入れておいてください。

$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-elasticsearch

環境変数の設定

今回はdirenvを使用して.envrcファイルに以下のように記載しましたが、この限りではありません。

.envrc
# MySQL
export DB_ADDRESS=127.0.0.1
export DB_PORT=3357
export DB_SCHEMA=myembulk
export DB_USER=root
export DB_PASSWORD=root

# Elasticsearch
export ES_ADDRESS=127.0.0.1
export ES_PORT=9279

Embulkの設定

Liquidテンプレートを使用して、MySQLからElasticsearchへデータを転送するEmbulkの設定ファイルを書きます。
先ほど設定した環境変数は、{{ env.hoge }} という表現になっていますね。

mysql2elasticsearch.yml.liquid
{% capture today %}{{ 'now' | date: '%Y%m%d' }}{% endcapture %}
in:
  type: mysql
  host: {{ env.DB_ADDRESS }}
  port: {{ env.DB_PORT }}
  user: {{ env.DB_USER }}
  password: {{ env.DB_PASSWORD }}
  database: {{ env.DB_SCHEMA }}
  table: myblog
  select: "id, title, description, status"
  where: "status = 1"
  default_column_options:
    TIMESTAMP: { type: string, timestamp_format: "%Y-%m-%d %H:%M:%S"}
out:
  type: elasticsearch
  mode: insert
  nodes:
  - {host: {{ env.ES_ADDRESS }}, port: {{ env.ES_PORT }}}
  index: myblog_{{ today }}
  index_type: myblog

Embulkで用意したファイルはこれだけです。

Embulkの実行

$ embulk run mysql2elasticsearch.yml.liquid

エラーにならなければ、成功です。

データ転送先の確認

先ほどの、docker-composeを使用したのであれば、以下のURLでKibanaから確認できます。

 $ open http://localhost:5679


データがちゃんと入ってますね!

さいごに

今回はDockerで最小限のデータ転送を試しました。
思ったより簡単にEmbulkの導入ができました。(Docker準備の方が時間がかかった・・)
実際のプロダクトでは、環境変数の他にセキュリティやネットワークの認証などの設定が増えそうですが、それでもEmbulk自体はシンプルな作りになると思います。