学習記録:ScalaはElasticSearch RestfulAPIを通じてscrollを使ってデータを検索する

3543 ワード

ローカルIDEデバッグでデータ処理タスクを実行する場合は、フル・クエリー・データをDataFrameに変換して操作する必要がありますが、ElasticSearchではデフォルトで1回のクエリーの最大エントリ数が10000と設定されているため、フル・データを一度にクエリーすることはできません.資料をクエリーすると、oracleのカーソルクエリーと同様にscroll(スクロール)クエリーを使用してすべてのデータを取得できることがわかります.
ScrollクエリはkibanaのDevToolsで次のように使用されます.
//   scroll=1m  scroll        1  
//     1             scroll_id        
GET /sqxzcf/_search?scroll=1m
{
  "query": {
    "match_all": {}
  },
  "size": 10000,
  "sort": [
    "_doc"
  ]
}

//               scroll scroll_id    ,                
GET /_search/scroll
{
  "scroll" : "1m",
  "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAQ1SFktGR1M1NlJjVHRpakQwSTVYRjlIREEAAAAAAAgm5RZBN09RZHlZWFJ5R3dhVlpYS2JSMktBAAAAAAAIT80WMWI1THcyT1BTeFNPN0ZwS3pYZy02UQAAAAAACCbmFkE3T1FkeVlYUnlHd2FWWlhLYlIyS0EAAAAAAAEg9RZUemc0dDNFZFF0dWVMaWNnSWxRcUV3"
}

具体的なscalaコードは以下の通りである(JSON解析関連APIを使用しており、詳細な説明は前編を参照).
//       main       ,  JSON       
import net.minidev.json.parser.JSONParser
import net.minidev.json.{JSONArray, JSONObject}
//   Object    
var scrollId: String = ""
var hitSize: Int = 0
var totalDataFrame: DataFrame = null

var response: String = Utils.postBody(
      url = s"${esProps.getProperty("es.rest")}/bi_enterprise/_search?scroll=1m",
      body = "{\"query\": {\"match_all\": {}},\"size\": 10000,\"sort\": [\"_doc\"]}",
      encoding = "utf-8",
      printResponse = true)
//   json-smart     JSON   
val jsonParser = new JSONParser(-1)
val jsonObj: JSONObject = jsonParser.parse(response).asInstanceOf[JSONObject]
//   _scroll_id         
scrollId = jsonObj.get("_scroll_id").toString
//                  DataFrame
val firstDf: DataFrame = parseJSONObjectToDataFrame(jsonObj, jsonParser, spark)
totalDataFrame = firstDf
//                     
while (hitSize != 0) {
    var responseNext: String = Utils.postBody(
            url = s"${esProps.getProperty("es.rest")}/_search/scroll",
            body = "{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId + "\"}",
            encoding = "utf-8",
            printResponse = true)
    val jsonObjNext: JSONObject = jsonParser.parse(responseNext).asInstanceOf[JSONObject]
    //   _scroll_id   scrollId           
    scrollId = jsonObj.get("_scroll_id").toString
    val nextDf: DataFrame = parseJSONObjectToDataFrame(jsonObjNext, jsonParser, spark)
    if (nextDf.count() > 0L) {
        //           
        totalDataFrame = totalDataFrame.union(nextDf)
    }
}

.......

//  JSONObject     DataFrame
def parseJSONObjectToDataFrame(jsonObj: JSONObject, jsonParser: JSONParser, spark: SparkSession): DataFrame = {
    //     hits  
    val hits = jsonObj.get("hits").toString
    val jsonObjHits: JSONObject = jsonParser.parse(hits).asInstanceOf[JSONObject]
    //     hits  
    val hitsArray = jsonObjHits.get("hits").toString
    val jsonObjSource: JSONArray = jsonParser.parse(hitsArray).asInstanceOf[JSONArray]
    //   hitSize,hitSize            scroll  
    hitSize = jsonObjSource.size()
    var strList = List.empty[String]
    var array = jsonObjSource.toArray()
    //     hits  _source      List[String]
    for (i 
        e.printStackTrace(System.out)
    } finally try
      httpClient.close()
    catch {
      case e: IOException =>
        e.printStackTrace(System.out)
    }
    re
  }