学習記録:ScalaはElasticSearch RestfulAPIを通じてscrollを使ってデータを検索する
3543 ワード
ローカルIDEデバッグでデータ処理タスクを実行する場合は、フル・クエリー・データをDataFrameに変換して操作する必要がありますが、ElasticSearchではデフォルトで1回のクエリーの最大エントリ数が10000と設定されているため、フル・データを一度にクエリーすることはできません.資料をクエリーすると、oracleのカーソルクエリーと同様にscroll(スクロール)クエリーを使用してすべてのデータを取得できることがわかります.
ScrollクエリはkibanaのDevToolsで次のように使用されます.
具体的なscalaコードは以下の通りである(JSON解析関連APIを使用しており、詳細な説明は前編を参照).
ScrollクエリはkibanaのDevToolsで次のように使用されます.
// scroll=1m scroll 1
// 1 scroll_id
GET /sqxzcf/_search?scroll=1m
{
"query": {
"match_all": {}
},
"size": 10000,
"sort": [
"_doc"
]
}
// scroll scroll_id ,
GET /_search/scroll
{
"scroll" : "1m",
"scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAQ1SFktGR1M1NlJjVHRpakQwSTVYRjlIREEAAAAAAAgm5RZBN09RZHlZWFJ5R3dhVlpYS2JSMktBAAAAAAAIT80WMWI1THcyT1BTeFNPN0ZwS3pYZy02UQAAAAAACCbmFkE3T1FkeVlYUnlHd2FWWlhLYlIyS0EAAAAAAAEg9RZUemc0dDNFZFF0dWVMaWNnSWxRcUV3"
}
具体的なscalaコードは以下の通りである(JSON解析関連APIを使用しており、詳細な説明は前編を参照).
// main , JSON
import net.minidev.json.parser.JSONParser
import net.minidev.json.{JSONArray, JSONObject}
// Object
var scrollId: String = ""
var hitSize: Int = 0
var totalDataFrame: DataFrame = null
var response: String = Utils.postBody(
url = s"${esProps.getProperty("es.rest")}/bi_enterprise/_search?scroll=1m",
body = "{\"query\": {\"match_all\": {}},\"size\": 10000,\"sort\": [\"_doc\"]}",
encoding = "utf-8",
printResponse = true)
// json-smart JSON
val jsonParser = new JSONParser(-1)
val jsonObj: JSONObject = jsonParser.parse(response).asInstanceOf[JSONObject]
// _scroll_id
scrollId = jsonObj.get("_scroll_id").toString
// DataFrame
val firstDf: DataFrame = parseJSONObjectToDataFrame(jsonObj, jsonParser, spark)
totalDataFrame = firstDf
//
while (hitSize != 0) {
var responseNext: String = Utils.postBody(
url = s"${esProps.getProperty("es.rest")}/_search/scroll",
body = "{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId + "\"}",
encoding = "utf-8",
printResponse = true)
val jsonObjNext: JSONObject = jsonParser.parse(responseNext).asInstanceOf[JSONObject]
// _scroll_id scrollId
scrollId = jsonObj.get("_scroll_id").toString
val nextDf: DataFrame = parseJSONObjectToDataFrame(jsonObjNext, jsonParser, spark)
if (nextDf.count() > 0L) {
//
totalDataFrame = totalDataFrame.union(nextDf)
}
}
.......
// JSONObject DataFrame
def parseJSONObjectToDataFrame(jsonObj: JSONObject, jsonParser: JSONParser, spark: SparkSession): DataFrame = {
// hits
val hits = jsonObj.get("hits").toString
val jsonObjHits: JSONObject = jsonParser.parse(hits).asInstanceOf[JSONObject]
// hits
val hitsArray = jsonObjHits.get("hits").toString
val jsonObjSource: JSONArray = jsonParser.parse(hitsArray).asInstanceOf[JSONArray]
// hitSize,hitSize scroll
hitSize = jsonObjSource.size()
var strList = List.empty[String]
var array = jsonObjSource.toArray()
// hits _source List[String]
for (i
e.printStackTrace(System.out)
} finally try
httpClient.close()
catch {
case e: IOException =>
e.printStackTrace(System.out)
}
re
}