EmbulkでRDSからS3へカラム単位でデータを転送


これは何

Embulkでプラグインを入れて、RDSからS3へカラム単位でデータを転送してみました。メモです。

やってみた

EmbulkはJavaが必要なので、インストール

/// Java8 install (runtime)
yum install java-1.8.0-openjdk

embulkインストール

/// embulk install
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

embulk-input-mysql インストール

/// embulk input install
embulk gem install embulk-input-mysql

embulk-output-s3 インストール

/// output
embulk gem install embulk-output-s3

テストデータ作成

/// テストデータ作成-RDS
create database test_
select test_;
use test_;

CREATE TABLE item (
    ->   id INT PRIMARY KEY AUTO_INCREMENT,
    ->   name VARCHAR(10),
    ->   description VARCHAR(30),
    ->   price INT UNSIGNED,
    ->   created_at DATETIME
    -> );

INSERT INTO item () VALUES ();
INSERT INTO item (id) SELECT 0 FROM item;
INSERT INTO item (id) SELECT 0 FROM item;
INSERT INTO item (id) SELECT 0 FROM item;
INSERT INTO item (id) SELECT 0 FROM item;
INSERT INTO item (id) SELECT 0 FROM item;

Query OK, 16 rows affected (0.01 sec)
Records: 16  Duplicates: 0  Warnings: 0

UPDATE item SET
    ->   name = CONCAT('商品', id),
    ->   description = SUBSTRING(MD5(RAND()), 1, 30),
    ->   price = CEIL(RAND() * 10000),
    ->   created_at = ADDTIME(CONCAT_WS(' ','2014-01-01' + INTERVAL RAND() * 180 DAY, '00:00:00'), SEC_TO_TIME(FLOOR(0 + (RAND() * 86401))));
Query OK, 32 rows affected, 32 warnings (0.01 sec)
Rows matched: 32  Changed: 32  Warnings: 32

IAMでIAMユーザーを作成
ポリシーはAmazonS3FullAccessをアタッチしました。

Embulk設定ファイル編集

/// Embulk設定ファイル編集
/// configファイル作成する。ディレクトリなども適当にconfigファイルを作成する。
/// yamlファイルは空白の数を間違えると、エラー吐くので注意。
vi embulk_mysql/config.yml

 in:
   type: mysql
   user: "admin"
   password: "パスワード"
   database: "test_"
   table: "item"
   host: "RDSエンドポイント"
   select: "name"
   parser:
    type: json
 out:
   type: s3
   path_prefix: mysql/out(任意の名前を設定)
   file_ext: .json
   bucket: バケット名
   endpoint: s3-ap-northeast-1.amazonaws.com(使用しているS3のリージョンに合わせる)
   access_key_id: アクセスキー
   secret_access_key: シークレットアクセスキー
   formatter:
    type: csv

Embulk実行

embulk run embulk_mysql/config.yml

S3の中身確認

転送できました。

参考

https://github.com/embulk/embulk#linux--mac--bsd
https://qiita.com/tayasu/items/c5ddfc481d6b7cd8866d
https://github.com/embulk/embulk-output-s3
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql
https://off.tokyo/blog/s3-aws/
https://kamihikouki.hatenablog.com/entry/2017/08/21/011446
https://qiita.com/da-sugi/items/78df4fcbc33b6627ff78
https://dev.mysql.com/doc/refman/5.6/ja/selecting-columns.html
https://www.capybara-engineer.com/entry/2021/04/12/142154
https://teratail.com/questions/265348
https://qiita.com/hiruandon2nd/items/e884b24e77f358f97781
https://open-groove.net/other-tools/embulk-aws-s3-redshift/
https://www.rubydoc.info/gems/embulk-output-s3/1.1.0