PowerShellでElasticsearchのBulkAPIを利用する


Elasticsearchに大量のデータを投入したい場合、1件ずつ送るよりBulkAPIを利用してまとめて投げ込んだほうが早い。ちょっと早い程度ではなくすごく早いので、使えるならBulkAPIを使いたい。
でもデータをBulkAPIで使える形に整形しなければならないので、ちょっと面倒。
PowerShellで書く場合どんな様子になるのかという話。

BulkAPI(Elasticsearch公式)

環境

Elasticsearchバージョン:7.8.1
リモートサーバ:WindowsServer2012R2
クライアント:Windows10

サンプル

Elasticsearch公式にサンプルデータセットがある。これをBulkAPIで投入する。
「顧客の銀行口座情報に関する架空のJSONドキュメント例」、1000件。
最初からBulkAPIが指定する形になっているので、このまま投げ込めばよしなに取り扱ってくれる。
公式ドキュメント データの調査
サンプルデータセット(直リンク)

sample_bulk.json
{"index":{"_id":"1"}}
{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"[email protected]","city":"Brogan","state":"IL"}
{"index":{"_id":"6"}}
{"account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"[email protected]","city":"Dante","state":"TN"}
{"index":{"_id":"13"}}
{"account_number":13,"balance":32838,"firstname":"Nanette","lastname":"Bates","age":28,"gender":"F","address":"789 Madison Street","employer":"Quility","email":"[email protected]","city":"Nogal","state":"VA"}

PowerShellでBulkAPIを利用する

利用するといっても適切なアドレスに送るだけである。

bulkapi.ps1
# BulkAPI利用
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  InFile = "sample_bulk.json"
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

# リフレッシュ
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post
took errors items                                          
---- ------ -----                                          
  66  False {@{index=}, @{index=}, @{index=}, @{index=}...}

remoteserverのElasticsearchにあるインデックスsample_bulk.2020-08へデータ投入する。
InFileパラメータでJSONファイル(サンプルそのまま)を指定。
ContentTypeはapplication/x-ndjsonを指定する。(普通のJSON送信ではapplication/json)

実行するとすぐに完了する。構造浅くて要素少なめで項目1000件程度のファイルでは1秒かからない。「本当に入ったのか?」てビビるがちゃんと入っているので安心して欲しい。

戻り表示のerrorsFalseなのにハテナが浮かぶかもしれないが、これは見たまま「エラーはありません」の意味。
itemsプロパティの中には投入したデータがひとつひとつ入っている。
これら戻り値を使いたければInvoke-RestMethodの結果を変数に入れよう。

最後リフレッシュしているのは、投入後にすぐ cat API 呼んでも投入した結果が反映されていない場合があるため。普通に1件ずつ投入する場合とはElasticsearchが自分でリフレッシュするタイミングが異なるようだ。(調べてない)

通常の投入方法と比較する

サンプルのJSONをCSVに変換したものを用いて、通常の1件ずつ送る方法で書いてみる。
下のサンプルは一行目がヘッダー行になっている。

sample_csv.csv
"account_number","balance","firstname","lastname","age","gender","address","employer","email","city","state"
"1","39225","Amber","Duke","32","M","880 Holmes Lane","Pyrami","[email protected]","Brogan","IL"
"6","5686","Hattie","Bond","36","M","671 Bristol Street","Netagy","[email protected]","Dante","TN"
"13","32838","Nanette","Bates","28","F","789 Madison Street","Quility","[email protected]","Nogal","VA"

indexapi.ps1
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $Input_JSON = $_ | ConvertTo-Json
  $Param = @{
    Uri = "http://remoteserver:9200/sample_csv.2020-08/"
    Method = "Post"
    ContentType = "application/json"
    Body = [system.text.encoding]::UTF8.GetBytes($Input_JSON)
  }
  Invoke-RestMethod @Param > $NULL
}
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_csv.2020-08/_refresh" -Method Post

CSVインポートして行ごとにJSONに変換し1件ずつInvoke-RestMethodで投入。
localhostならこれでも秒で終わるのだが、リモートサーバへ1件ずつHTTP通信するのでは「そりゃ遅いだろ」て思う。
実際に比較すると下のようになる。

Measure-Commandで比較
Measure-Command{.\bulkapi.ps1}

Days              : 0
Hours             : 0
Minutes           : 0
Seconds           : 0
Milliseconds      : 422
Ticks             : 4221031
TotalDays         : 4.8854525462963E-06
TotalHours        : 0.000117250861111111
TotalMinutes      : 0.00703505166666667
TotalSeconds      : 0.4221031
TotalMilliseconds : 422.1031


Measure-Command{.\indexapi.ps1}

Days              : 0
Hours             : 0
Minutes           : 1
Seconds           : 6
Milliseconds      : 734
Ticks             : 667340938
TotalDays         : 0.000772385344907407
TotalHours        : 0.0185372482777778
TotalMinutes      : 1.11223489666667
TotalSeconds      : 66.7340938
TotalMilliseconds : 66734.0938

0.42秒 vs 66.73秒。勝負になってない…。
データが10万件あったら40秒 vs 1時間50分である。
これじゃBulkAPI以外使う気になれないね、と言いたいところだが、BulkAPIは「データをAPIが指定するJSON形式に加工しないと」使えないので、そこを一山超えなければならない。

CSV読み込んで整形してBulkAPIへ送信

Import-Csvで読んだ行をJSON化してつぎはぎして整えて投げればいいかな、とこんなのを書いた。が、エラーになる。

convertto-bulk.ps1(失敗)
$BulkList = New-Object System.Collections.ArrayList
$Bulk_Index = '{"index":{}}'

# CSV読み込んで指定のJSON形式に整形
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $BulkList.Add($Bulk_Index) > $null
  $Bulk_Data = $_ | ConvertTo-Json -Compress
  $BulkList.Add($Bulk_Data) > $null
}
# 最後は改行で終わっていないとエラーになる
$BulkList.Add("`r`n")

# BulkAPIでデータ投入
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  Body = $BulkList
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

# リフレッシュ
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post -ContentType "application/json"
Invoke-RestMethod : {"error":{"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: no requests added;"}],"type":"action_request_validation_exception","reason":"Vali
dation Failed: 1: no requests added;"},"status":400}
発生場所 行:17 文字:1
+ Invoke-RestMethod @Param
+ ~~~~~~~~~~~~~~~~~~~~~~~~
    + CategoryInfo          : InvalidOperation: (System.Net.HttpWebRequest:HttpWebRequest) [Invoke-RestMethod]、WebException
    + FullyQualifiedErrorId : WebCmdletWebResponseException,Microsoft.PowerShell.Commands.InvokeRestMethodCommand

action_request_validation_exceptionValidation Failed: 1: no requests added。来たデータ解釈できないぞ…みたいな?

部分ごとにあれこれ確認して、つぎはぎしたJSONは間違っていないことは確認できた。とすると間違っているのはInvoke-RestMethodの送り方だ。Bodyパラメータで送るとダメなのだろうか?

公式ドキュメントを読んでみたところ、「BodyパラメータはType:Object、InFileパラメータはType:String」との記述を発見した。これだ。
Invoke-Restmethod(MS公式)

Body指定でStringで送るようなパラメータ・オプションは見つけられなかったので、やむを得ず中間ファイルを生成しInFileパラメータで送ることにした。

convertto-bulk.ps1(成功)
$BulkList = New-Object System.Collections.ArrayList
$Bulk_Index = '{"index":{}}'

# CSV読み込んで指定のJSON形式に整形
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $BulkList.Add($Bulk_Index) > $null
  $Bulk_Data = $_ | ConvertTo-Json -Compress
  $BulkList.Add($Bulk_Data) > $null
}
Out-File -InputObject $BulkList -LiteralPath "sample_temp.json" -Encoding default

# BulkAPIでデータ投入
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  InFile = "sample_temp.json"
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post -ContentType "application/json"

これにて投入成功。
中間ファイルは終わったら削除してもいいかも。
ファイルを生成し書き込む分少し時間がかかるが、それ以上にデータ投入にかかる時間が短縮される。

調べてないこと

汚いデータはどうしよう

複数の区分のデータがまとめて書き込まれてるようなデータ。
中間データ生成の際に区分ごとにファイルを分けてしまえばいいだろうか?
全部Elasticsearchに投げて任せるのは…都合が良すぎる考えか。

リフレッシュのタイミング

通常のデータ投入だとデータ増加がリアルタイム確認できるのだけど、BulkAPIでまとめ投げした場合はそうでもないようだ。リフレッシュは手動でしたほうがいいのか、システム側に任せた方がいいのか?