Elasticsearch high level API使用例


        
        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            7.0.0
        
        
            org.elasticsearch
            elasticsearch
            7.0.0
        
        
            org.elasticsearch.client
            transport
            7.0.0
        
        
            org.elasticsearch.plugin
            transport-netty4-client
            7.0.0
        
  

import org.elasticsearch.action.search.SearchResponse;

import java.util.List;
import java.util.Map;

/**
 * @Author: gh
 * @Description: elasticsearch    (  )、    、    、    、      、           ;
 * es    ,          。           。         。
 */
public interface EsDao {

    /**
     *     
     * @param index     
     * @param pageSize     
     * @param pageNum    
     * @return
     */
    public SearchResponse getAllRowsBySearchAfter(String index, Integer pageSize, Integer pageNum);
    public SearchResponse getAllRowsByFromSize(String index, Integer pageSize, Integer pageNum);
    public List> getAllRowsByScroll(String index);
    /**
     *     index  document  。
     * count API   :GET /index_name/type_name/_count
     * @param indexes        index
     * @return
     */
    public long totalCount(String...indexes);
    /**
     *     document    。
     * @param index     
     * @param id _id        。
     * @return true  ,false   。
     */
    public boolean docIsExist(String index,String id);
    /**
     *     document。
     * @param index     
     * @param kvs       (key)     (value)
     * @return 200/201      ;   400      。
     */
    public int insertDoc(String index,Map kvs);

    /**
     *     document。
     * @param index     
     * @param id _id        。
     * @return 200/201      ;   400      。
     */
    public int deleteDoc(String index,String id);

    /**
     *     document
     * @param index     
     * @param id _id        。
     * @param kvs       (key)     (value)
     * @return 200/201      ;   400      。
     */
    public int updateDoc(String index,String id,Map kvs);
    /**
     * es    = cluster store(size_in_bytes) / (            + size_in_bytes)
     * size_in_bytes: "_all":{"total":{"store":{"size_in_bytes": ***}}}
     *            :(    )"fs":{"total":{"available_in_bytes":***}}
     * @return
     */
    public double usedRate();
    /**
     *     cluster    index    。
     *   :_cluster/state/routing_table
     * @param clusterName     
     * @return
     */
    public Map> getClusterIndexes(String clusterName);

    /**
     *     index    type    。
     * _type   elasticsearch 6.0.0      ,7.0       。
     * @return
     */
    public Map> getIndexTypes(String clusterName);
    /**
     * @return             
     */
    public double storeSizeOfMB();
    /**
     * @param index          
     * @return (     )          
     */
    public double storeSizeOfMB(String index);
    /**
     *   :1.      (      )   index(    )   。     elasticsearch。
     *   :2.    index(      )   type(    )   。
     * @return     
     */
    public int countTables();

    /**
     *       (      )    indice(    )     。
     * @return
     */
    public List getTablenamesOfDB();
    /**
     *            
     * @param indexName    
     * @param fieldName    
     * @param fieldValue      
     */
    public SearchResponse queryByRandomField(String indexName,String fieldName,String fieldValue,
                                             int pageSize,int pageNum);

    /**
     * @param indexName    
     * @param fieldName    
     * @param fieldValue      
     * @return             
     */
    public long totalCountOfFuzzyQuery(String indexName,String fieldName,String fieldValue);
    /**
     *              
     * @param indexName      。  :table1
     * @return map k:   ,v:    
     */
    public Map getColumnNames(String indexName);
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genius.pojo.pg.dto.DataBaseDTO;
import com.genius.util.StringUtil;
import com.genius.util.common.FileSizeUtil;
import com.genius.util.common.SizeUnitEnum;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.geo.builders.CoordinatesBuilder;
import org.elasticsearch.common.geo.builders.GeometryCollectionBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.locationtech.jts.geom.Coordinate;
import org.springframework.util.StringUtils;

import java.io.*;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @Description:   es JDBC          (  )、    、    、    、      。
 * 【  】
 * 1.Elasticsearch JDBC          ;             。
 * 2.  Elasticsearch x-pack-sql-jdbc     :
 * java.sql.SQLInvalidAuthorizationSpecException:
 * current license is non-compliant for [jdbc]
 *   JDBC   ,elasticsearch        :https://www.elastic.co/cn/subscriptions
 */
public class EsDaoImpl implements EsDao{

    RestHighLevelClient restClient = null;
    DataBaseDTO dataBaseDTO = null;

    public EsDaoImpl(DataBaseDTO dbd) {
        try {
            dataBaseDTO = dbd;
            this.restClient = connectToES();
            //initClient();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public RestHighLevelClient getRestClient() {
        return restClient;
    }
    public void setRestClient(RestHighLevelClient restClient) {
        this.restClient = restClient;
    }
    /**
     *     
     */
    public void close(){
        try{
            if(this.restClient != null){
                restClient.close();
            }
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    public boolean connected(){
        try {
            if(getRestClient() != null && getRestClient().ping(RequestOptions.DEFAULT)){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * JDBC  elasticsearch。http port=9200
     * jdbc:es://[[http|https]://][host[:port]]/[prefix] 1){
                String scrollId = resp.getScrollId();
                for(int i=1;i> getAllRowsByScroll(String indexName, String column, String value) {
        List> collect = new ArrayList<>();
        final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(60));
        SearchResponse resp = null;
        SearchRequest search = new SearchRequest(indexName);
        search.scroll(scroll);

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();


        try {
            setQuerySearchBuilder(sourceBuilder,indexName,column,value);
            sourceBuilder.size(100);
            sourceBuilder.sort("_id", SortOrder.ASC);
            search.source(sourceBuilder)
                    .searchType(SearchType.QUERY_THEN_FETCH)
                    .scroll(TimeValue.timeValueSeconds(60));
            resp = restClient.search(search, RequestOptions.DEFAULT);
            assert resp != null;
            String scrollId;
            int count = 0;
            do {
                count++;
                System.out.println("=================="+count);
                collect.addAll(Arrays.stream(resp.getHits().getHits()).map(m->{
                    Map oneRowData = m.getSourceAsMap();  //sourceAsMap    null
                    if(oneRowData != null){
                        oneRowData.put("_id", m.getId());
                        //oneRowData.put("_type", hit.getType());
                    }
                    return oneRowData;
                }).collect(Collectors.toList()));
                scrollId = resp.getScrollId();
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(scroll);
                resp = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            } while (resp.getHits().getHits().length != 0);
            //    
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            //     setScrollIds()   scrollId    
            clearScrollRequest.addScrollId(scrollId);
            restClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            if (collect.size() == 0 || collect == null) {
                return  null;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return collect;
    }

    @Override
    public   List>  getAllRowsByScroll(String index) {
        List> collect = new ArrayList<>();
        final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(60));
        SearchResponse resp = null;
        SearchRequest search = new SearchRequest(index);
        search.scroll(scroll);
        try {
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.size(100);
            sourceBuilder.sort("_id", SortOrder.ASC);
            search.source(sourceBuilder);
            resp = restClient.search(search, RequestOptions.DEFAULT);
            assert resp != null;
            String scrollId;
            int count = 0;
            do {

                Arrays.stream(resp.getHits().getHits()).forEach(hit->{
                    Map map=hit.getSourceAsMap();
                    map.put("_id",hit.getId());
                    collect.add(map);
                });
                scrollId = resp.getScrollId();
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(scroll);
                resp = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            } while (resp.getHits().getHits().length != 0);
            //    
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            //     setScrollIds()   scrollId    
            clearScrollRequest.addScrollId(scrollId);
            restClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            if (collect.size() == 0 || collect == null) {
                return  null;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return collect;
    }

    @Override
    public SearchResponse getAllRows(String index) {
        //           ,     :IndexNotFoundException: no such index
        SearchResponse resp = null;
        SearchRequest search = new SearchRequest(index);
        try {
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            search.source(sourceBuilder);
            resp = restClient.search(search, RequestOptions.DEFAULT);

        } catch (IOException e) {
            e.printStackTrace();
        }
        return resp;
    }
    @Override
    public SearchResponse getAllRowsByFromSize(String index, Integer pageSize, Integer pageNum) {
        if(pageSize==null || pageSize<1){
            pageSize = 10;
        }
        if(pageNum==null || pageNum<1){
            pageNum = 1;
        }
        //           ,     :IndexNotFoundException: no such index
        SearchResponse resp = null;
        SearchRequest search = new SearchRequest(index);
        try {
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            //    
            sourceBuilder.from(pageSize*pageNum-pageSize);
            sourceBuilder.size(pageSize);
            sourceBuilder.sort("_id", SortOrder.ASC);
            search.source(sourceBuilder);
            resp = restClient.search(search, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //    TransportClient   API.
        /*if(indexIsExist(index)) {
            resp = client.prepareSearch(index)
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setFrom(pageSize*pageNum-pageSize).setSize(pageSize)
                    //     ,           
                    .addSort("_id", SortOrder.ASC)
                    //.setPostFilter(QueryBuilders.rangeQuery("doc.offset").from(7000).to(10000))
                    .get();
        }*/
        return resp;
    }
    @Override
    public SearchResponse getAllRowsBySearchAfter(String index, Integer pageSize, Integer pageNum) {
        if(pageSize==null || pageSize<1){
            pageSize = 10;
        }
        if(pageNum==null || pageNum<1){
            pageNum = 1;
        }
        //============NOTE: API   ---  high level API
        SearchResponse resp = null;
        Object[] sortValues = null;
        int counter = 0;
        try {
            //TODO:  -pageNum  ,     !
            do{
                //  :      
                counter += 1;
                SearchRequest search = new SearchRequest(index);
                SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                //    
                sourceBuilder.size(pageSize);
                sourceBuilder.sort("_id", SortOrder.ASC);
                //      ,  search_after  。
                if(sortValues != null){
                    sourceBuilder.searchAfter(sortValues);
                }
                search.source(sourceBuilder);
                resp = restClient.search(search, RequestOptions.DEFAULT);
                SearchHits hits = resp.getHits();
                //        ,hitSize = 0
                int hitSize= hits.getHits().length;
                if(hitSize == 0){
                    break;
                }
                SearchHit hit = hits.getHits()[hitSize - 1];
                sortValues = hit.getSortValues();
            }while(counter < pageNum);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return resp;
        //============NOTE: API   ---  low level API
        /*Object tiebreaker  = null; //          ID
        String jsonParam = null;
        try {
            do{
                if(tiebreaker == null){
                    jsonParam = "{\"size\": "+pageSize+
                            ",\"sort\": [{\"_id\": \"desc\"}]}";
                }else{
                    jsonParam = "{\"size\": "+pageSize+"," +
                            "\"search_after\":"+tiebreaker+","+
                            "\"sort\": [{\"_id\": \"desc\"}]}";
                }
                //search_after  
                Request req = new Request("get", index+"/_search");
                req.setJsonEntity(jsonParam);
                RestClient client = restClient.getLowLevelClient();
                Response resp = client.performRequest(req);
                HttpEntity entity = resp.getEntity();
                if(entity != null){
                    InputStream content = entity.getContent();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(content));
                    String line = null;
                    StringBuffer sb = new StringBuffer();
                    while((line = reader.readLine())!=null){
                        sb.append(line);
                    }
                    JSONObject jo = JSON.parseObject(sb.toString());
                    JSONArray jsonArray = jo.getJSONObject("hits").getJSONArray("hits");
                    int dataSize = jsonArray.size();
                    if(dataSize > 0){
                       *//* XContentParser parser = xContentType.xContent().createParser(NamedXContentRegistry.EMPTY,
                                DeprecationHandler.THROW_UNSUPPORTED_OPERATION, jsonArray.toJSONString());*//*
                        //        
                        Object lastResult = jsonArray.get(jsonArray.size() - 1);
                        if(lastResult instanceof  JSONObject){
                            tiebreaker  = ((JSONObject) lastResult).getJSONArray("sort");
                        }
                    }else{
                        break;
                    }
                }else{
                    break;
                }
            }while(true);

        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;*/
    }
    public Map extractResponse(SearchResponse resp){
        Map rowDatas = new HashMap<>();
        List> data = new ArrayList<>();
        Set fields = new HashSet<>();
        if(resp != null){
            SearchHits hits = resp.getHits();
            Iterator ite = hits.iterator();
            while(ite.hasNext()){
                SearchHit hit = ite.next();
                //sourceAsMap    null
                Map oneRowData = hit.getSourceAsMap();
                if(oneRowData != null){
                    oneRowData.put("_id", hit.getId());
                    //oneRowData.put("_type", hit.getType());
                    //[NOTE:]                   
                    //[NOTE:] routing   ,      
                }
                fields.addAll(oneRowData.keySet());
                data.add(oneRowData);
            }
        }
        rowDatas.put("data", data);
        rowDatas.put("fields", fields);
        rowDatas.put("pk", "_id");
        return rowDatas;
    }
    private long getMaxresult(String index){
        //ClusterGetSettingsRequest cgsr = new ClusterGetSettingsRequest();
        long maxResult = 10000;
        try {
            Request req = new Request("get", index+"/_settings");
            RestClient client = restClient.getLowLevelClient();
            Response resp = client.performRequest(req);
            HttpEntity entity = resp.getEntity();
            if(entity != null){
                InputStream content = entity.getContent();
                BufferedReader reader = new BufferedReader(new InputStreamReader(content));
                String line = null;
                StringBuffer sb = new StringBuffer();
                while((line = reader.readLine())!=null){
                    sb.append(line);
                }
                JSONObject jo = JSON.parseObject(sb.toString());
                //    
                JSONObject settingObj = jo.getJSONObject(index)
                        .getJSONObject("settings")
                        .getJSONObject("index");
                String value = settingObj.getString("max_result_window");
                if(value == null){
                    return maxResult; //    10000
                }
                maxResult =   Long.valueOf(value);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return maxResult;
    }
    /**
     *         
     * @param index     
     * @return boolean
     */
    private boolean isIndexExist(String index){
        try{
            if(!StringUtils.isEmpty(index)){
                GetIndexRequest gir = new GetIndexRequest(index);
                return  restClient.indices().exists(gir, RequestOptions.DEFAULT);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
        //    TransportClient   API.
        //BasicConfigurator.configure();
        /*AdminClient admin = client.admin();
        IndicesAdminClient indices = admin.indices();
        IndicesExistsRequestBuilder ierb = indices.prepareExists(index);
        ActionFuture future = ierb.execute();
        IndicesExistsResponse resp = future.actionGet();
        if(resp.isExists()){
            System.out.format("index [%s] is exist...
", index); return true; }else { System.out.format("index [%s] is NOT exist...
", index); return false; }*/ return false; } // , 。 @Override public int insertDoc(String index,Map jsonMap) { try { if(jsonMap != null){ // Map columns = getColumnNames(index); Set keys = jsonMap.keySet(); for (String key : keys) { EsFieldType eft = EsFieldType.GEO_POINT; if(eft.getType().equals(columns.get(key))){ // , Object transferedField = eft.getTransferedField(jsonMap.get(key).toString()); if(transferedField==null){ return 0; } jsonMap.put(key, transferedField); } } // id ,id 。 IndexRequest indexRequest = new IndexRequest(index).source(jsonMap); //opType must be 'create' or 'index'. // optype=index , ID document , 。 //NOTE: id , ; id 。 indexRequest.opType(DocWriteRequest.OpType.INDEX); //indexRequest.timeout(TimeValue.timeValueSeconds(1)); // , indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); /* * , : 。 *Elasticsearch exception [type=mapper_parsing_exception, * reason=failed to parse field [location] of type [geo_point]]] */ IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT); int result = indexResponse.status().getStatus(); if(result==RestStatus.OK.getStatus() || result == RestStatus.CREATED.getStatus()){ return 1; // } } } catch (Exception e) { e.printStackTrace(); } return 0; } @Override public int deleteDoc(String index, String id) { try { DeleteRequest delRequest = new DeleteRequest(index,id); // routing, routing String routing = getRouting(index, id); if(routing != null){ delRequest.routing(routing); } // , delRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); DeleteResponse delResp = restClient.delete(delRequest, RequestOptions.DEFAULT); int result = delResp.status().getStatus(); if(result==RestStatus.OK.getStatus() || result == RestStatus.CREATED.getStatus()){ return 1; // } } catch (IOException e) { e.printStackTrace(); } return 0;// } @Override public int updateDoc(String index, String id, Map kvs) { try { // Map columns = getColumnNames(index); Set keys = kvs.keySet(); for (String key : keys) { EsFieldType eft = EsFieldType.GEO_POINT; if(eft.getType().equals(columns.get(key))){ // , Object transferedField = eft.getTransferedField(kvs.get(key).toString()); if(transferedField==null){ return 0; } kvs.put(key, transferedField); } } UpdateRequest updateRequest = new UpdateRequest(index, id); /* * :ElasticsearchStatusException[Elasticsearch exception [type=document_missing_exception * routing, routing */ String routing = getRouting(index, id); if(routing != null){ updateRequest.routing(routing); } updateRequest.doc(kvs); //NOTE: document, , 。 UpdateResponse updateResp = restClient.update(updateRequest, RequestOptions.DEFAULT); // , updateResp.setForcedRefresh(true); int result = updateResp.status().getStatus(); if(result==RestStatus.OK.getStatus() || result == RestStatus.CREATED.getStatus()){ return 1; // } } catch (Exception e) { e.printStackTrace(); } return 0; } @Override public boolean docIsExist(String index, String id) { boolean flag = false; try { GetRequest getRequest = new GetRequest(index, id); GetResponse getResp = restClient.get(getRequest, RequestOptions.DEFAULT); flag = getResp.isExists(); } catch (IOException e) { e.printStackTrace(); }finally { return flag; } } @Override public long totalCount(String...indexes) { long countNum = 0; try { // , search 。 CountRequest countRequest = new CountRequest(indexes); CountResponse countResp = restClient.count(countRequest, RequestOptions.DEFAULT); countNum = countResp.getCount(); } catch (IOException e) { e.printStackTrace(); }finally { return countNum; } } @Override public double usedRate() { //es = cluster store(size_in_bytes) / ( + size_in_bytes) double rate = 0.0; try { Request req = new Request("get", "_cluster/stats"); RestClient client = restClient.getLowLevelClient(); Response resp = client.performRequest(req); HttpEntity entity = resp.getEntity(); if(entity != null){ InputStream content = entity.getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(content)); String line = null; StringBuffer sb = new StringBuffer(); while((line = reader.readLine())!=null){ sb.append(line); } JSONObject jo = JSON.parseObject(sb.toString()); // ( index) long totalIndexSizes = jo.getJSONObject("indices") .getJSONObject("store") .getLongValue("size_in_bytes"); // long totalAvailableFSSizes = jo.getJSONObject("nodes") .getJSONObject("fs") .getLongValue("available_in_bytes"); System.out.println(totalIndexSizes+"==============="+totalAvailableFSSizes); rate = (double)totalIndexSizes / (totalAvailableFSSizes + totalIndexSizes); } } catch (IOException e) { e.printStackTrace(); }finally { return Double.parseDouble(String.format("%.2f",rate*100)); } } @Override public double storeSizeOfDB(SizeUnitEnum unit) { return storeSizeOfDB(null,unit); } @Override public double storeSizeOfDB(String index,SizeUnitEnum unit) { try { Request req = new Request("get", "_stats"); RestClient client = restClient.getLowLevelClient(); Response resp = client.performRequest(req); HttpEntity entity = resp.getEntity(); if(entity != null){ InputStream content = entity.getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(content)); String line = null; StringBuffer sb = new StringBuffer(); while((line = reader.readLine())!=null){ sb.append(line); } JSONObject jo = JSON.parseObject(sb.toString()); if(StringUtils.isEmpty(index)){ // ( index) long bytes = jo.getJSONObject("_all") .getJSONObject("total") .getJSONObject("store") .getLongValue("size_in_bytes"); return FileSizeUtil.valueOf((double)bytes, unit); }else{ // if(isIndexExist(index)){ long bytes = jo.getJSONObject("indices") .getJSONObject(index) .getJSONObject("total") .getJSONObject("store") .getLongValue("size_in_bytes"); return FileSizeUtil.valueOf((double)bytes, unit); } } } } catch (Exception e) { e.printStackTrace(); } return 0.0; } @Override public double storeSizeOfTbl(String[] indices, SizeUnitEnum unit) { try { if(indices != null ){ Request req = new Request("get", "_stats"); RestClient client = restClient.getLowLevelClient(); Response resp = client.performRequest(req); HttpEntity entity = resp.getEntity(); if(entity != null){ InputStream content = entity.getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(content)); String line = null; StringBuffer sb = new StringBuffer(); while((line = reader.readLine())!=null){ sb.append(line); } JSONObject jo = JSON.parseObject(sb.toString()); JSONObject indiceJO = jo.getJSONObject("indices"); // long bytes = 0L; for (String index : indices) { // //if(isIndexExist(index)){ } if(indiceJO.get(index) != null){ bytes += indiceJO .getJSONObject(index) .getJSONObject("total") .getJSONObject("store") .getLongValue("size_in_bytes"); } } return FileSizeUtil.valueOf((double)bytes, unit); } } } catch (Exception e) { e.printStackTrace(); } return 0.0; } @Override public int countTables() { // :elasticsearch return countIndices("elasticsearch"); } private int countIndices(String clusterName){ //ClusterStatsRequest req = new ClusterStatsRequest(); //Request req = new Request("get", "_cluster/stats"); return getTablenamesOfDB().size(); } @Override public List getTablenamesOfDB() { List nameList = new ArrayList<>(); List filteredNameList = new ArrayList<>(); try { Request req = new Request("get", "_stats"); Response resp = restClient.getLowLevelClient().performRequest(req); HttpEntity entity = resp.getEntity(); if(entity != null){ InputStream content = entity.getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(content)); String line = null; StringBuffer sb = new StringBuffer(); while((line = reader.readLine())!=null){ sb.append(line); } JSONObject jo = JSON.parseObject(sb.toString()); // indices JSONObject indices = jo.getJSONObject("indices"); nameList.addAll(indices.keySet()); } // index for (String idx : nameList) { if(!idx.startsWith(".")){ filteredNameList.add(idx); } } } catch (Exception e) { e.printStackTrace(); } return filteredNameList; } @Override public SearchResponse queryByRandomField(String indexName, String fieldName, String fieldValue, int pageSize,int pageNum) { SearchResponse resp = null; try { /* SearchRequest search = new SearchRequest(indexName); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // sourceBuilder.from(pageSize*pageNum-pageSize); sourceBuilder.size(pageSize); sourceBuilder.sort("_id", SortOrder.ASC); setQuerySearchBuilder(sourceBuilder,indexName,fieldName,fieldValue); search.source(sourceBuilder); resp = restClient.search(search, RequestOptions.DEFAULT);*/ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); setQuerySearchBuilder(sourceBuilder,indexName,fieldName,fieldValue); resp = getAllRowsByScrollScan(sourceBuilder, indexName, pageSize, pageNum); }catch(Exception e){ e.printStackTrace(); } return resp; } private void setQuerySearchBuilder(SearchSourceBuilder sourceBuilder, String indexName, String fieldName, String fieldValue) throws Exception{ String[]numeric = {"long","integer","short","byte","double","float","half_float","scaled_float"}; List numericTypes = Arrays.asList(numeric); // String fieldType = getFieldType(indexName, fieldName); if("text".equals(fieldType) || "keyword".equals(fieldType)){ // * wildcardQuery , // :fuzzy,wildcard: text,keyword 。 //text : fuzzy 。keyword , fuzzy 。 sourceBuilder.query(QueryBuilders.wildcardQuery(fieldName,"*" + fieldValue + "*")); //sourceBuilder.query(QueryBuilders.fuzzyQuery(fieldName, fieldValue).fuzziness(Fuzziness.AUTO)); //sourceBuilder.query(QueryBuilders.matchQuery(fieldName, fieldValue).fuzziness(Fuzziness.AUTO)); }else if(numericTypes.contains(fieldType)){ if(StringUtil.isNumeric(fieldValue)){ sourceBuilder.query(QueryBuilders.rangeQuery(fieldName).gte(fieldValue)); } }else if("geo_point".equals(fieldType)){ //Geo fields do not support exact searching, use dedicated geo queries instead if(fieldValue != null){ String[] locations = fieldValue.split(","); if(locations.length == 4){ double top = StringUtil.isNumeric(locations[0].trim())?Double.valueOf(locations[0].trim()):90; double left = StringUtil.isNumeric(locations[1].trim())?Double.valueOf(locations[1].trim()): -180; double bottom = StringUtil.isNumeric(locations[2].trim())?Double.valueOf(locations[2].trim()) :-90; double right = StringUtil.isNumeric(locations[3].trim())?Double.valueOf(locations[3].trim()): 180; sourceBuilder.query(QueryBuilders.geoBoundingBoxQuery(fieldName) .setCorners(top, left, bottom, right)); } } }else if("geo_shape".equals(fieldType)){ //Geo fields do not support exact searching, use dedicated geo queries instead if(fieldValue != null){ String[] locations = fieldValue.split(","); if(locations.length == 4){ double top = StringUtil.isNumeric(locations[0].trim())?Double.valueOf(locations[0].trim()):90; double left = StringUtil.isNumeric(locations[1].trim())?Double.valueOf(locations[1].trim()): -180; double bottom = StringUtil.isNumeric(locations[2].trim())?Double.valueOf(locations[2].trim()) :-90; double right = StringUtil.isNumeric(locations[3].trim())?Double.valueOf(locations[3].trim()): 180; List coordinates = new CoordinatesBuilder().coordinate(left, top) .coordinate(right, bottom).build(); GeometryCollectionBuilder gcb = new GeometryCollectionBuilder(); gcb.coordinates(coordinates); sourceBuilder.query(QueryBuilders.geoWithinQuery(fieldName, gcb)); } } }else{ sourceBuilder.query(QueryBuilders.matchQuery(fieldName, fieldValue)); } } @Override public long totalCountOfFuzzyQuery(String indexName, String fieldName, String fieldValue) { long counter = 0; try { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // :fuzzy,wildcard setQuerySearchBuilder(sourceBuilder,indexName,fieldName,fieldValue); // , search 。 CountRequest countRequest = new CountRequest(new String[]{indexName},sourceBuilder); CountResponse countResponse = restClient.count(countRequest, RequestOptions.DEFAULT); counter = countResponse.getCount(); } catch (Exception e) { e.printStackTrace(); } return counter; } // private String getFieldType(String indice,String fieldName)throws IOException{ Map mappings = getMappingInfo(indice); Map source = mappings.get(indice).getSourceAsMap(); Object properties = source.get("properties"); if(properties instanceof LinkedHashMap){ LinkedHashMap map = (LinkedHashMap)properties; Object field = map.get(fieldName); if(field instanceof LinkedHashMap){ LinkedHashMap fieldMap = (LinkedHashMap)field; String type = fieldMap.get("type").toString(); return type; } } return null; } /** * mapping * @param indice * @return * @throws IOException */ public Map getMappingInfo(String indice) throws IOException{ GetMappingsRequest gmr = new GetMappingsRequest(); gmr.indices(indice); GetMappingsResponse resp = restClient.indices() .getMapping(gmr, RequestOptions.DEFAULT); Map mappings = resp.mappings(); return mappings; } @Override public Map getColumnNames(String indexName) { Map columnNames = new HashMap<>(); GetMappingsRequest mappingsRequest = new GetMappingsRequest().indices(indexName); try { GetMappingsResponse mappingsResponse = restClient.indices() .getMapping(mappingsRequest, RequestOptions.DEFAULT); Map mappings = mappingsResponse.mappings(); if(mappings != null){ MappingMetaData metaData = mappings.get(indexName); if(metaData != null){ Map sourceAsMap = metaData.getSourceAsMap();//properties if(sourceAsMap != null){ Collection collection = sourceAsMap.values();//Object = map Map tmp = new HashMap<>(); Iterator ite = collection.iterator(); while (ite.hasNext()){ tmp.putAll((Map)ite.next()); } Set fields = tmp.keySet(); // for (String field : fields) { Map fieldMap = (Map)tmp.get(field); columnNames.put(field, fieldMap.get("type")); } } } } } catch (IOException e) { e.printStackTrace(); } return columnNames; } /** * ID document, routing * @param index * @param id */ private String getRouting(String index, String id){ SearchResponse resp = queryByRandomField(index, "_id", id, 1, 1); if(resp != null){ SearchHits hits = resp.getHits(); Iterator ite = hits.iterator(); while(ite.hasNext()){ SearchHit hit = ite.next(); DocumentField df = hit.field("_routing"); if(df != null){ List values = df.getValues(); if(values != null){ String valStr = values.toString(); // routing return valStr.substring(1, valStr.length()-1); } } } } return null; } @Override public Map> getClusterIndexes(String clusterName) { return null; } @Override public Map> getIndexTypes(String clusterName) { return null; } }
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.regex.Pattern;

/**
 * @Description: elasticsearch    :
 * https://www.elastic.co/guide/en/elasticsearch/reference/6.0/mapping-types.html
 *         elasticsearch        
 */
public enum EsFieldType {
    /**
     *      ,        。
     * geo_point     :
     * #     "location": {"lat": 41.12,"lon": -71.34}
     * #    "location": "41.12,-71.34"
     * #geohash "location": "drm3btev3e86"
     * #    "location": [ -71.34, 41.12 ]
     */
    GEO_POINT("geo_point"){
        @Override
        public Object fieldValueTransfer(String fieldValue) {
            String rex1 = "\\{(\\s)*\"lat\"(\\s)*:(\\s)*(\\-|\\+)?\\d+(\\.\\d+)?(\\s)*,"+
                    "(\\s)*\"lon\"(\\s)*:(\\s)*(\\-|\\+)?\\d+(\\.\\d+)?(\\s)*\\}";
            String rex2 = "(\\-|\\+)?\\d+(\\.\\d+)?(\\s)*,(\\s)*(\\-|\\+)?\\d+(\\.\\d+)?";
            String rex3 = "^[a-z0-9]+$";
            String rex4 = "^\\[(\\s)*(\\-|\\+)?\\d+(\\.\\d+)?(\\s)*,(\\s)*(\\-|\\+)?\\d+(\\.\\d+)?(\\s)*\\]$";
            if(match(rex1,fieldValue)){
                //json object
               return  parseJsonToGeopoint(fieldValue);
            }else if(match(rex4,fieldValue)){
                //array
                return JSON.parseArray(fieldValue);
            }else if(match(rex2,fieldValue)){
                //string
                return fieldValue;
            }else if(match(rex3,fieldValue)){
                //geohash,          
                return GeoPoint.fromGeohash(fieldValue);
            }else{
                return null;
            }
        }
        private GeoPoint parseJsonToGeopoint(String jsonStr){
            //{"lat": 41.12,"lon": -71.34}
            JSONObject jo = JSON.parseObject(jsonStr);
            //    JSONObject   ,           。
            return new GeoPoint(jo.getDoubleValue("lat"), jo.getDoubleValue("lon"));
        }
    };

    EsFieldType(String type) {
        this.type = type;
    }

    public Object getTransferedField(String fieldValue){
        return fieldValueTransfer(fieldValue);
    }
    public boolean match(String rex,String input){
        Pattern p = Pattern.compile(rex);
        return p.matcher(input).matches();
    }
    private String type;
    public String getType() {
        return type;
    }
    protected abstract Object fieldValueTransfer(String fieldValue);
}