Esデータをhdfsにアクセス
5919 ワード
最近1つの需要を受け取って、esログのデータにアクセスしてhdfsに着く必要があって、分析を行って、ネット上で資料を探して、総括して以下のいくつかの方法がありますhive自身直接サポート接続es直接参照リンクhttp://lxw1234.com/archives/2015/12/585.htm このようなやり方の弊害を説明します. (a)、esクラスタは、通常、セキュリティのためにユーザ認証および証明書認証に参加することを考慮し、上記の方法では をサポートしない.(b)、hive定義テーブル構造の場合、フィールドタイプマッピングはesと一致しなければならないが、esドキュメントtypeにフィールドタイプ変更がある場合、hiveはよく認識できず、hiveは類似タイプ変換の誤り を報告する
esは、es esを操作するための2つのjava apiの公式apiアドレスを提供します.https://www.elastic.co/guide/en/elasticsearch/client/index.html (a)、transportインタフェースはTCP接続であるクラスタはユーザー認証と証明書認証を行ったため、以下のようにesに接続したが、残念ながら時間の問題で接続できず、しばらくこの問題を解決できなかった.
Exception in thread "main"NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{3HUrRF8JQGCz_TlwhQOFiA}{10.17.2.79}{10.17.2.79:9305}]](b)、restインタフェースアクセスesはhttpインタフェースであり、esクラスタはssl認証を採用しているため、先に認証を行う (1)証明書ファイルをjksファイルに合成し、es公式サイトAPIは操作KeyStore keytool-import-v-trustcacerts-file niudingfeng.crt-keystore my_keystore.jks -keypass password -storepass password (2)ユーザパスワード検証およびhttps認証
以上が認証コード(3)接続es取得データ注意:httpインタフェースはデフォルトで10個のデータを返し、より多くのデータを返す必要がある場合はfrom sizeを制定する必要があります.esバージョンの問題で、公式java high level rest clientは使用できません.最低バージョンの要求は5.6ですので、この方法はお勧めしません. 最後にPython apiを用いてPythonクエリーesを実現するには2つの方法がある (a)、search (b)、helps.scan
Exception in thread "main"NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{3HUrRF8JQGCz_TlwhQOFiA}{10.17.2.79}{10.17.2.79:9305}]]
Settings settings = Settings.builder()
.put("cluster.name", esDataToText.cluster)
.put("xpack.security.user", esDataToText.userPw)
.put("xpack.ssl.key", esDataToText.keyPath)
.put("xpack.ssl.certificate", esDataToText.crtPath)
.put("xpack.ssl.certificate_authorities", esDataToText.cacrtPath)
.put("xpack.security.transport.ssl.enabled", true)
.put("client.transport.ping_timeout", "100s")
.build();
try {
TransportClient client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esDataToText.urls), esDataToText.port));
SearchResponse response = client.prepareSearch("ndf.dlp")
.setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
SearchHits resultHits = response.getHits();
Long result_cnt = resultHits.totalHits;
logger.info(" :" + result_cnt);
} catch (UnknownHostException e) {
e.printStackTrace();
}
//
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("bigdata", "123456qwerty"));
//ssl
SSLContextBuilder sslBuilder = null;
try {
sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
} catch (KeyStoreException e) {
e.printStackTrace();
}
以上が認証コード
RestClient restClient = RestClient.builder(new HttpHost("testelk002.niudingfeng.com", 9205, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setDefaultCredentialsProvider(credentialsProvider);
}
})
.build();
Response response = null;
try {
String method = "GET";
String endpoint = "/ndf.dlp/_search";
String queryStr = "{
" +
"\t\t\"query\":{ \"range\": {
" +
" \t\t\t\t\t\"@timestamp\": {
" +
" \t\t\t\t\t\"gte\": \"2017-12-27\",
" +
" \t\t\t\t\t\"lte\": \"2017-12-28\"
" +
" \t\t\t\t\t\t\t}
" +
" \t\t\t\t\t\t}
" +
"\t\t\t\t}
" +
"}";
// String queryStr = "{\"query\":{\"match_all\":{}}}";
HttpEntity entity = new NStringEntity(queryStr, ContentType.APPLICATION_JSON);
response = restClient.performRequest(method,endpoint,Collections.emptyMap(),entity);
String res = EntityUtils.toString(response.getEntity());
String resFile = "D:\\java\\es\\res.txt";
File file = new File(resFile);
if(file.exists()){
file.delete();
}
BufferedWriter bw = new BufferedWriter(new FileWriter(resFile));
bw.write(res);
bw.close();
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
res = es.search(index='index_name',
doc_type=’type_name’, body=es_query, request_timeout=999999,params={“search_type”:”query_and_fetch”})
:search , sense , ,
, from size , ,
es_client = es.Elasticsearch(
[host],
http_auth=(user, pswd),
port=port,
use_ssl=True,
verify_certs=False,
timeout=300)
res = helpers.scan(es_client, index=index, query=query, scroll='1m',request_timeout=999999,preserve_order=False)
:scan , , ,scroll , ,