Esデータをhdfsにアクセス

5919 ワード

最近1つの需要を受け取って、esログのデータにアクセスしてhdfsに着く必要があって、分析を行って、ネット上で資料を探して、総括して以下のいくつかの方法があります
  • hive自身直接サポート接続es直接参照リンクhttp://lxw1234.com/archives/2015/12/585.htm このようなやり方の弊害を説明します.
  • (a)、esクラスタは、通常、セキュリティのためにユーザ認証および証明書認証に参加することを考慮し、上記の方法では
  • をサポートしない.
  • (b)、hive定義テーブル構造の場合、フィールドタイプマッピングはesと一致しなければならないが、esドキュメントtypeにフィールドタイプ変更がある場合、hiveはよく認識できず、hiveは類似タイプ変換の誤り
  • を報告する
  • esは、es esを操作するための2つのjava apiの公式apiアドレスを提供します.https://www.elastic.co/guide/en/elasticsearch/client/index.html
  • (a)、transportインタフェースはTCP接続であるクラスタはユーザー認証と証明書認証を行ったため、以下のようにesに接続したが、残念ながら時間の問題で接続できず、しばらくこの問題を解決できなかった.

  • Exception in thread "main"NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{3HUrRF8JQGCz_TlwhQOFiA}{10.17.2.79}{10.17.2.79:9305}]]
    Settings settings = Settings.builder()
                    .put("cluster.name", esDataToText.cluster)
                    .put("xpack.security.user", esDataToText.userPw)
                    .put("xpack.ssl.key", esDataToText.keyPath)
                    .put("xpack.ssl.certificate", esDataToText.crtPath)
                    .put("xpack.ssl.certificate_authorities", esDataToText.cacrtPath)
                    .put("xpack.security.transport.ssl.enabled", true)
                    .put("client.transport.ping_timeout", "100s")
                    .build();
            try {
                TransportClient client = new PreBuiltXPackTransportClient(settings)
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esDataToText.urls), esDataToText.port));
                SearchResponse response = client.prepareSearch("ndf.dlp")
                        .setQuery(QueryBuilders.matchAllQuery())
                        .execute().actionGet();
                SearchHits resultHits = response.getHits();
                Long result_cnt = resultHits.totalHits;
                logger.info("    :" + result_cnt);
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
    
  • (b)、restインタフェースアクセスesはhttpインタフェースであり、esクラスタはssl認証を採用しているため、先に認証を行う
  • (1)証明書ファイルをjksファイルに合成し、es公式サイトAPIは操作KeyStore keytool-import-v-trustcacerts-file niudingfeng.crt-keystore my_keystore.jks -keypass password -storepass password
  • (2)ユーザパスワード検証およびhttps認証
  •         //      
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials("bigdata", "123456qwerty"));
    
            //ssl    
            SSLContextBuilder sslBuilder = null;
            try {
                sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
            } catch (KeyStoreException e) {
                e.printStackTrace();
            }
    

    以上が認証コード
  • (3)接続es取得データ注意:httpインタフェースはデフォルトで10個のデータを返し、より多くのデータを返す必要がある場合はfrom sizeを制定する必要があります.esバージョンの問題で、公式java high level rest clientは使用できません.最低バージョンの要求は5.6ですので、この方法はお勧めしません.
  • RestClient restClient = RestClient.builder(new HttpHost("testelk002.niudingfeng.com", 9205, "https"))
                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder.setSSLContext(sslContext).setDefaultCredentialsProvider(credentialsProvider);
                        }
                    })
                    .build();
            Response response = null;
    
    
            try {
                String method = "GET";
                String endpoint = "/ndf.dlp/_search";
                String queryStr = "{
    " + "\t\t\"query\":{ \"range\": {
    " + " \t\t\t\t\t\"@timestamp\": {
    " + " \t\t\t\t\t\"gte\": \"2017-12-27\",
    " + " \t\t\t\t\t\"lte\": \"2017-12-28\"
    " + " \t\t\t\t\t\t\t}
    " + " \t\t\t\t\t\t}
    " + "\t\t\t\t}
    " + "}"; // String queryStr = "{\"query\":{\"match_all\":{}}}"; HttpEntity entity = new NStringEntity(queryStr, ContentType.APPLICATION_JSON); response = restClient.performRequest(method,endpoint,Collections.emptyMap(),entity); String res = EntityUtils.toString(response.getEntity()); String resFile = "D:\\java\\es\\res.txt"; File file = new File(resFile); if(file.exists()){ file.delete(); } BufferedWriter bw = new BufferedWriter(new FileWriter(resFile)); bw.write(res); bw.close(); restClient.close(); } catch (IOException e) { e.printStackTrace(); }
  • 最後にPython apiを用いてPythonクエリーesを実現するには2つの方法がある
  • (a)、search
  • res = es.search(index='index_name', 
    doc_type=’type_name’, body=es_query, request_timeout=999999,params={“search_type”:”query_and_fetch”}) 
       :search             ,  sense          ,     ,
          ,   from size  ,     ,     
    
  • (b)、helps.scan
  • es_client = es.Elasticsearch(
        [host],
        http_auth=(user, pswd),
        port=port,
        use_ssl=True,
        verify_certs=False,
        timeout=300)
    res = helpers.scan(es_client, index=index, query=query, scroll='1m',request_timeout=999999,preserve_order=False)
      :scan             ,      ,            ,scroll       ,      ,