【詳解】opensearch-jsのhelpersでsearch scrollとbulkのAPIを扱う

42385 ワード

こんにちは!

昨年から本業の方でOpenSearch全般の運用・保守を行っております。
最近はopensearch-js(OpenSearchのJSクライアントライブラリ)を扱ったドキュメントの出し入れを行う処理に何度か改善を行っていました。
その中で得た知見等をアウトプットしようと思います。

前置き

前提として、以下の実行環境とします。
※ (後述)一部例外あり

パッケージ version
AWS OpenSearch Service 1.0
opensearch-js 1.1.0

client.helpersを利用することによって得られるメリット

ElasticSearchもOpenSearchもそうですが、
一時的に読み書きリクエストが集中するとJVMメモリプッシャーが上がってしまいサーキットブレーカーが発動してしまいます。
これによってIOパフォーマンスが悪くなったり、429 Too Many Requestが返却されてread/writeのロックが掛かったりと、
付随するシステムに影響を及ぼしてしまいます。

そして、下記のように面倒な制御をclient.helpersが行ってくれます。

  • 共通
    • 429返却時に自動でretry(デフォルトは3回まで)してくれる
      • helpersを利用すること無く何も意識していないとResponse Errorがエスカレーションされてしまう
    • retry時すぐに再リクエストをするのでは無く、wait(デフォルトは5000ms)後に再リクエストしてくれる
  • client.helpers.scrollSearch
    • 取得し終えるまでの再帰関数をコーディングする必要が無い
      • またscrollの終端でclearScrollする手続き等面倒な制御が不要
      • client.searchを利用するよりもレスポンスの取り出しや付随する処理のコーディングが楽になる
  • client.helpers.bulk
    • 分割リクエストサイズや同時リクエスト数の制限を制御するのが簡単
    • 書き込みに失敗したクエリの取得も行える

search(scroll API含む) APIの扱い方について

helpersを使わない場合

scroll時のcallback関数を事細かく制御していくと、
もう少し複雑な制御になってくると思います。

await new Promise((resolve, reject) => {
  const results = [];
  client.search({
    index: 'hoge',
    size: 10,
    scroll: '1m',
    body: {
      query: { ...foo },
    },
  }, async function scroll(error, response){
    try {
      if (!response || !response.body.hits) {
        if(response.body._scroll_id) await client.clearScroll({ scroll_id: response.body._scroll_id });
        reject(error);
      }
      if (response.body.hits.hits.length <= 0) {
        await client.clearScroll({ scroll_id: response.body._scroll_id });
        resolve(results)
      }
      results.push(response.body.hits.hits);
      client.scroll(
        { scroll_id: String(response.body._scroll_id), scroll: '1m' },
        scroll
      );
    } catch(e) {
      await client.clearScroll({ scroll_id: response.body._scroll_id });
      reject(error);
    }
  });
});

helpersを使う場合

エラー時の手続きがスマートになります。

const scrollSearch = client.helpers.scrollSearch({
  index: 'hoge',
  size: 10,
  scroll: '1m',
  body: {
    query: { ...foo },
  },
}, { maxRetries: 3, wait: 5000 });

for await (const response of scrollSearch) {
  console.log(response)
}

helpers.scrollSearchを使うことによって既に考慮されている処理について以下で解説しています。

第二引数のoptionについて

key名 役割
maxRetries 429エラー返却時の最大リトライ回数
wait 429エラー発生時に再試行するまでのミリ秒数

responseを取り出した時点で

  • response.body.hitsが存在する
  • && response.body.hits.hits.length > 0

ことが確約されています。

以下より。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L122-L122

responseの取り出し方について

client.helpers.scrollSearch自体がジェネレーター関数となっており、
responseを順番に返却してくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L129-L129

※ そのため取り出すたびにリクエストが走ります。

scrollのclear方法について

ジェネレーター関数から終端のresponseを取り出した時点で自動でclearScrollを行ってくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L150-L150

以下はscrollをclearする際の処理。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120

また、終端のresponseを取り出すまでにscrollをclearしたい場合は下記のようにします。

for await (const response of scrollSearch) {
  if (cond) response.clear()
}

response.clear にclearScroll処理のiteratorが生えていることはこちらで確認できます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L126-L126

clearScrollの制御処理。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120

429 Too Many Request時の制御について

429エラーの時はwait(第二引数のoptionで指定したwait)後に再リクエストします。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L141-L142

それをretry(第二引数のoptionで指定したretry)回分までは再リクエストしてくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L135-L135

※ retry回分までに429が解消されない場合はResponse Errorがエスカレーションされます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L144-L146

Bulk APIの扱い方について

helpersを使わない場合

正常系の処理のみであればこういう感じになるかと思います。

const response = await client.bulk({
  index: "test",
  body: [
    { create: { _id: "sample-1" } },
    { user_id: 1, name: "foo" },

    { index : { _id : "sample-2" } },
    { user_id : 2, name: "hoge" },

    { delete : { _id : "sample-2" } },

    { update : { _id : "sample-1" } },
    { doc : { name : "Foo" } },

    { update : { _id : "sample-2" } },
    { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
  ]
});

加えて、429エラーが返却されないように、

  • リクエストサイズの調整
  • 同時リクエスト数の制限

等のチューニングが少々面倒だったりします。

また、OpenSearch側のリソースに余裕がある場合でも、
1リクエスト辺りの句数上限(max_clause_count)を気にしないといけなかったりします。

加えて、
どのクエリが失敗したのか、失敗したクエリだけ取り出したい
という制御処理が煩雑になってしまいがちです。

helpersを使う場合

前置き

元々、client.helpers.bulkでupdateオペレーションのscriptドキュメントが対応していなかったので、
使えるように修正をしてPRを出しました。