Elasticsearchインポートjsonデータ

10691 ワード

jsonデータをesにインポート
  • 各レコードには、対応するindex宣言が必要です.jsonファイルはBulk APIの要求
  • に適合しなければならない.
  • jsonファイルの各データの前にindex情報(index/type/id...)を含める必要があります.
  • curl -H 'Content-Type: application/x-ndjson'  -s -XPOST localhost:9200/_bulk --data-binary @accounts.json
    
    {"index":{"_index":"index2","_type":"type2","_id":0}}
    {"age":10,"name":"jim"}
        {"index":{"_index":"index2","_type":"type2","_id":1}}
    {"age":16,"name":"tom"}
    
  • インポートファイルが同じindex(/index/type)である場合、urlでデフォルトindex(/index/type)を宣言できます.jsonファイルではindexとtypeを宣言する必要はありません.また、サポート部分レコードにはindexとtypeが表示されます.
  • curl -H 'Content-Type: application/x-ndjson'  -s -XPOST localhost:9200/index1/_bulk --data-binary @accounts.json #URL     index
    curl -H 'Content-Type: application/x-ndjson'  -s -XPOST localhost:9200/index1/type1/_bulk --data-binary @accounts.json #URL     index/type
    
    {"index":{}
    {"age":6,"name":"bob"}
    {"index":{"_id":"2"}}
    {"age":10,"name":"jim"}
    {"index":{"_id":"6"}}
    {"index":{"_type":"type2","_id":1}}
    {"age":16,"name":"tom"}
    {"index":{"_index":"index2","_type":"type3","_id":1}}
    {"age":20,"name":"lucy"}
    
  • は、フォーマット要件を満たすjsonを直接インポートすることができ、mappingを事前に作成する必要はなく、esはjsonデータに基づいて自動的に作成することができる.事前にmappingを宣言し、カスタム設定
  • を行うこともできる.
  • curlコマンドインポート例
  • shapefileをesにインポート
    多くの資料を調べた後、理論的には次のような方法があります.
  • GDALを使用して直接shapefileをESにインポートし、
  • に失敗しました.
  • GDAL(またはArcMap)を使用してshapefileをjsonファイルに導き、curlコマンドを使用してjsonファイルをインポートする説明:生成されたjsonファイルフォーマットはBulk APIの要求に合致せず、処理しなければ
  • にインポートできない.
  • Arcpyを使用してスクリプトを記述し、shapefileをBulkの要件を満たすjsonにエクスポートし、curlコマンドを使用して
  • をインポート(またはスクリプト実装)する.
  • ArcMapを使用してgeojsonをエクスポート(またはArcpyを使用)、python(java)を使用してjsonファイルを解析し、Bulk APIを使用してスクリプトを作成してES
  • にインポート
    ここで注意すべき問題は次のとおりです.
  • GDALまたはArcMapを使用して導出されたgeojsonフォーマットとBulk APIで要求されるデータフォーマットが異なる
  • データはGeo-shape datatypeまたはGeo-point datatypeに適合する必要がありますcurlコマンドを使用して
  • をインポートできます.
    以上、最終的に第4の方法で解決することを選択し、GDALを使用してshapefileをgeojsonファイルにエクスポートし、python elasticsearch bulk APIを使用してスクリプトを作成し、geojsonを解析してESをインポートします.
    1.shapefile to es(失敗)
    GDAL for ES Driverは、shapefileを直接ESにインポートする方法を提供しますが、インポート時にエラーが発生しました.
    ogr2ogr -f "ElasticSearch" http://localhost:9200 my_shapefile.shp
    
    #ERROR 1: HTTP error code : 405
    #ERROR 8: Could not connect to server
    #ElasticSearch driver failed to create http://localhost:9200/
    

    2.jsonファイルで転送する(jsonファイルフォーマットを処理する必要がある)
    もう1つの方法は、ogr 2 ogrツールを使用してshapefileをgeojsonファイルに変換し、geojsonファイルをESにインポートすることです.
    2.1 shapefile to geojson
    How to convert and import Arc Shapefile from Zillow into an elastic search database? Ask
    ogr2ogr -f GeoJSON  map1.geojson map.shp
    

    生成されたjsonファイルのフォーマットは以下の通りで、index情報は含まれておらず、Bulk APIは直接インポートできません.
    {
    "type": "FeatureCollection",
                                                                                    
    "features": [
    { "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Anhui" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 119.632111, 31.139344000000108 ], [ 119.644439000000148, 31.115657 ], [ 119.624672, 31.084624000000133 ] ] ] } } 
    { "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Beijing" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 117.379737, 40.226871 ], [ 117.382896, 40.208718000000147 ], [ 117.369484, 40.190997 ] ] ] } }
    ]
    }
    
    

    2.2 create index mapping
    Indexing GeoShapesは、geoフィールドと重点フィールドに対してmappingを行い、残りのフィールドがインポートされると自動的に生成されます.
    PUT /gis1
    {
      "mappings": {
        "province1": {
           "properties": {
              "location": {
                 "type": "geo_shape"
              },          
              "ID_0": {
                 "type": "text"
              },
                "NAME_1": {
                 "type": "text"
              }
           }
        }
      }
    }
    

    2.3 import json
    curl -H 'Content-Type: application/x-ndjson' -XPOST 'http://localhost:9200/gis1/province1/_bulk?pretty' --data-binary @map1.geojson
    
    #  :"type":"json_e_o_f_exception","reason":"Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@1ffa375d; line: 1, column: 1])
    at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@1ffa375d; line: 1, column: 3]"

    データを以下のフォーマットに変更してbulkコマンドを使用してインポートする必要があります.
    {"index":{"_id":"1"}}
    { "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Zhejiang" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 121.187362, 27.86903100000012 ], [ 121.190140000000156, 27.847919 ], [ 121.156249, 27.823749000000134 ]] ] ] } }
    {"index":{"_id":"2"}}
    { "type": "Feature", "properties": { "ID_0": 46, "NAME_1": "Jiangsu" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 121.187362, 27.86903100000012 ], [ 121.190140000000156, 27.847919 ], [ 121.156249, 27.823749000000134 ]] ] ] } }
    

    4.pythonスクリプトGDAL生成geojsonファイルのインポート
    # rasa 20180212
    #   ES python api  geojson   (map.geojson)
    
    import json
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    
    
    def set_mapping(es, index_name="content_engine", doc_type_name="en"):
        my_mapping = {
            doc_type_name: {
                "properties": {
                    "location": {
                        "type": "geo_shape"
                    },
                    "ID_0": {
                        "type": "text"
                    },
                    "NAME_1": {
                        "type": "text"
                    }
                }
            }
        }
    
        # ignore 404 and 400
        es.indices.delete(index=index_name, ignore=[400, 404])
        print("delete_index")
    
        # ignore 400 cause by IndexAlreadyExistsException when creating an index
        create_index = es.indices.create(index=index_name, ignore=400)
        mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping)
        if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
            print("Index creation failed...")
    
    
    def set_data(es, input_file, index_name, doc_type_name="en"):
        with open(input_file, 'r') as f:
            data = json.load(f)
    
        features = data["features"]
    
        ACTIONS = []
        i = 0
        count = 0
    
        for feature in features:
            action = {}
    
            if (feature["geometry"]["type"] == "Polygon"):  #   geometry   polygon
                action = {
                    "_index": index_name,
                    "_type": doc_type_name,
                    "_source": {
                        "ID_0": feature["properties"]["ID_0"],
                        "NAME_1": feature["properties"]["NAME_1"],
                        "location": {
                            "type": "polygon",
                            "coordinates": feature["geometry"]["coordinates"]
                        }
                    }
                }
            else:  # geometry   multipolygon
                action = {
                    "_index": index_name,
                    "_type": doc_type_name,
                    "_source": {
                        "ID_0": feature["properties"]["ID_0"],
                        "NAME_1": feature["properties"]["NAME_1"],
                        "location": {
                            "type": "multipolygon",
                            "coordinates": feature["geometry"]["coordinates"]
                        }
                    }
                }
    
            i += 1
            print("prepare insert:  %s" % feature["properties"]["NAME_1"])
            print("type:  %s" % feature["geometry"]["type"])
            ACTIONS.append(action)
            if (i == 5):
                success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
                count += success
                i = 0
                ACTIONS = []
                print("insert %s lines" % count)
    
        success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
        count += success
        print("insert %s lines" % count)
    
    
    if __name__ == '__main__':
        # es = Elasticsearch(hosts=["127.0.0.1:9200"], http_auth=('elastic','changeme'),timeout=5000)
        es = Elasticsearch(hosts=["127.0.0.1:9200"], timeout=5000)
        set_mapping(es, "gis6", "province")
    
        # geojson   ogr2ogr    
    
        # set_data(es, "./data/map-fujian.geojson", "gis6", "province")  # multipolygon
        # set_data(es, "./data/map-anhui.geojson", "gis6", "province") # polygon
        set_data(es, "./data/map-full.geojson", "gis6", "province")  # polygon & multipolygon
        # set_data(es, "./data/map.geojson", "gis6", "province")       # polygon
    
    

    に質問
    1.python bulkを使用してmultipolygonをインポートするエラー:Invalid Linearring found.Found a single coordinate when expecting a coordinate array
    sourcecode 638行
    理由:
  • 同じgeojsonファイルにpolygonとmultipolygonのタイプのfeature
  • が存在する
  • mappingをpolygonに設定し、multipolygonデータを挿入するとエラーが報告される:invalid number of points in Linearring(found[1]-must be>=[4]);
  • mappingをmultipolygonに設定し、polygonデータを挿入するとエラー:Invalid Linearring found.Found a single coordinate when expecting a coordinate array

  • 解決方法:featureのタイプを判断し、作成したmappingが異なり、ESは同じtypeにpolygon/multipolygonを同時に格納することをサポートする