Embulkでローカルディスクを使わずにクラウドストレージにアップロードする


embulk-output-commandからgsutilコマンドにデータを渡してストリーミングアップロードしてディスクレスにしてみた話。

embulkってなにって方はこちらから。

追記

2020-09-13

この記事の gsutil と embulk-output-command 行っていたストリーミングデータ転送処理をEmbulkプラグインで書きました。
https://github.com/irotoris/embulk-output-gcs_streaming

なんでやったの

EmbulkでMySQLやOracleからデータをBigQueryにデータ連携する際、データレイクとしてひとまずGoogle Cloud Storage(GCS)にファイルをアップロードしています。

最初はembulk-output-gcsを使ってたんですけど、1TBのテーブルとか出てきてマシンのストレージが枯渇しちゃったんですよね。

そこで、embulk-output-commandからgsutilコマンドにデータを渡して、ストリーミングアップロードしてディスクレスにしてみました。

GCSのストリーミングアップロードに関してはこちら。
https://cloud.google.com/storage/docs/streaming

どうやったの

環境

  • ubuntu:bionic
  • openjdk-8-jdk
  • Embulk v0.9.17
  • embulk-input-mysql v0.10.1
  • embulk-output-gcs v0.4.4
  • embulk-output-command v0.1.4
  • gsutil 4.47

embulkはGKEでコンテナ実行

Before (embulk-outout-gcs)

embulk-output-gcsを使うとこんな感じ。

ローカルの/tmpにファイルがに書き込まれて、GCSアップロード後に削除されているのが分かります。

job.yaml
in:
  type: mysql
  host: localhost
  port: 3306
  database: test
  user: mysql_user
  password: xxxxxxx
  query: select * from test_table
out:
  type: gcs
  auth_method: compute_engine
  bucket: bucket_name
  path_prefix: mysql/test/test_table
  file_ext: .csv
  formatter:
    type: csv
...
2020-02-15 13:11:17.756 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:11:17.790 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-02-15 13:11:19.186 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:11:19.187 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:11:19.289 +0000 [INFO] (0016:task-0000): SQL: select * from test
2020-02-15 13:11:19.311 +0000 [INFO] (0016:task-0000): > 0.01 seconds
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Local Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA== / Remote Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA==
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Delete generated file: /tmp/embulk/2020-02-15 13-11-16.996 UTC/0016_task-0000_5418765600230876857.tmp > true
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Uploaded 'bucket_name/mysql/test/test_table.000.01.csv' to 930bytes
...

After (embulk-output-command + gsutil)

embulk-output-commandからgsutilでストリーミングアップロードするとこんな感じ。
inputの設定は同じなので省略してます。
gsutilが標準入力からデータを受け取ってアップロードしているログが出てます。

job.yaml
out:
  command: "gsutil cp - gs://bucket_name/mysql/test/test_table/data.$INDEX.$SEQID.csv"
  formatter:
    type: csv
  type: command
...
2020-02-15 13:09:59.514 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:09:59.543 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-02-15 13:09:59.559 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.634 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.638 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.643 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.700 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:09:59.701 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:09:59.814 +0000 [INFO] (0016:task-0000): SQL: select * from test_table
2020-02-15 13:09:59.838 +0000 [INFO] (0016:task-0000): > 0.01 seconds
Copying from <STDIN>...
Copying from <STDIN>...   0.0 B]                                                
Copying from <STDIN>...   0.0 B]                                                
Copying from <STDIN>...   0.0 B]                                                
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
...

Pros / Cons

Pros

  • ディスクレスでembulkでクラウドストレージにアップロードできる
    • SSDって値段高いよね。
    • コンテナ on k8sで動かそうとした日にはPersistent Volumeがでてきて管理が面倒です。
    • ステートレス最高。

Cons

  • gsutilのストリーミングアップロードはチェックサム計算をしてくれない
    • なので自分でデータの検証を書く必要がります。もはやローカルにファイル持ってないので、チェックサムによる検証はできません。いまはBigQueryにロードできるかどうか(CSVファイルとして正しいか)と、MySQLとBigQueryのレコード数が一致しているかを見ています。

性能

OracleからGCSに入れる処理で比較。

  • ネットワーク経路: Oracle---(DedicatedIntterConnect)---GKE--(PrivateAccess)---GCS
  • コンピュート: 8 vCPU / 7.5GB Mem / 50GB SSD
  • データ量: 2,000,000レコード / 7ファイル / 5.2GB / CSV
処理 時間 コマンド
embulk-output-gcs 6m18.374s embulk run job.yaml
embulk-output-command + gsutil cp(streaming) 3m5.471s embulk run job.yaml
embulk-output-file + gsutil cp(bulk) 3m15.211s embulk run job.yaml && gsutil -m cp -r /tmp/embulk gs://bucket_name/tmp/embulk

embulk-output-gcsは内部で使ってるGCSのAPIバージョンが古いので、結構遅かった。
なので、embulkでファイル化したあとに普通にgsutil cp (bulk upload)した処理も計測。
コンピュートリソースやネットワーク構成、input側も影響するので一概には言えませんが、5.2GBくらいだったらストリーミングアップロードにしても問題なさそうでした。

ちなみに2TBのテーブルでやったときは50時間ほどかかったので、時間は線形増加にはならなさそう。

メモ

embulk-output-commandは、ScatterExecutorの恩恵を受ける。
outputタスク数(デフォルトだとCPUコア数、設定で指定可能)の分だけ並列に処理してくれる。
Embulk の LocalExecutor プラグインの振る舞いについて整理

あときっとembulk-output-s3もawscli使えば同じようにストリーミングアップロードができると思う。

検証中にgsutilコマンドがうまく動かないと思ったらこれだった。
https://github.com/googleapis/google-api-python-client/issues/803
httplib2をダウングレードして解決。

$ pip install httplib2==0.15.0