FirestoreのデータをElastcsearchへ移行してみた


FirebaseのFirestoreの検索がいまいちなので、
Elasticsearchを使うことにしてみました。
特にFirestoreのgeohashでの検索の精度がいまひとつ悪い、
というのが一番の理由です。

というわけで、FirestoreのデータをElasticsearchへインポートしてみました。

Elasticsearchの準備

今回は、Elastic Cloudサービスを利用することにしました。
無料トライアルを利用してみて、このCloudサービスから利用した方がコントロールパネルが充実してそうなのでこちらを選択することにしました。

たぶん、料金はGCP、AWSから直で使っても同等の値段になると思います。

indexの作成

testというindexを作成

mapping設定

Elasticsearchでは基本データ投入時に自動で型を設定してくれるようですが、
それだと緯度経度がgeo_pointという型にならないので予めgeo_pointを定義します。

実行するとエラーに、

400 — Bad Request

{
  "status": 400,
  "error": {
    "root_cause": [
      {
        "reason": "Root mapping definition has unsupported parameters:  [items : {properties={location={type=geo_point}}}]",
        "type": "mapper_parsing_exception"
      }
    ],
    "type": "mapper_parsing_exception",
    "reason": "Root mapping definition has unsupported parameters:  [items : {properties={location={type=geo_point}}}]"
  }
}

前もって定義しようとしてもエラーになる、他にいい方法があるかもしれないけど仮のデータを投入してからgeo_pointを定義してみる。

仮のデータ挿入

念のためデータができてるか確認

改めてmapping設定

無事に成功、念のためmapping設定の確認

仮のデータ削除

データ移行処理

1件づつ移行するととても時間がかかるのでbulkという方法でまとめてINSERTします。
Functionsでも応用できるのでnodejsを利用します。

firebase_to_es.js
var admin = require("firebase-admin");
var serviceAccount = require("../project-0000000-firebase-adminsdk-xxxx-xxxxx.json");
admin.initializeApp({
  credential: admin.credential.cert(serviceAccount),
  databaseURL: "https://project-0000000.firebaseio.com"
});
var db = admin.firestore();
const esconfig = {
    "url"       : "https://xxxxx.asia-northeast1.gcp.cloud.es.io:9243/",
    "username"  : "elastic",
    "password"  : "xxxxxxxxxxx"
};
const request = require('request-promise');

var startId = 0;

(async() => {

    do{
        items = await getItems(startId);

        if(items){
            var data = "";

            for(i in items){
                var item = items[i];
                item.location = {lat: item.location._latitude, lon: item.location._longitude};
                data += '{ "index":{ "_id" : "'+item.id+'" } }\n';
                data += JSON.stringify(item)+'\n'
            }


            let elasticsearchRequest = {
                method: "POST",
                uri: esconfig.url + "test/items/_bulk",
                auth: {
                    username: esconfig.username,
                    password: esconfig.password,
                },
                body: data,
                headers: {
                    "Content-type": "application/x-ndjson",
                },      
                json: false
            };

            await request(elasticsearchRequest).then(response => {
                //console.log('Elasticsearch response', response);
            })      
        }

    }while(items)


    console.log("finish!");


})();

async function getItems(id){
    console.log("id", id);

    var snapshot = await new Promise( (resolve, reject) => {
        db.collection('items').where('id', '>', id).orderBy('id').limit(50).get()
                .then(snapshot => {
                    if (snapshot.empty) {
                        console.log('No matching documents.');
                        resolve(null);
                        return;
                    }
                    resolve(snapshot);
                    return;
                })
                .catch(err => {
                    console.log('Error getting documents', err);
                    resolve(null);
                });
    }).catch(function(error) {

        console.log("The error is handled, continue normally");

    });

    if(snapshot===null){
        return false;
    }

    var items = [];

    snapshot.forEach(doc => {
        items.push(doc.data());
        startId = parseInt(doc.id);
    }); 

    return items;

}


移行処理の実行

$ node firebase_to_es.js

KibanaでデータをMapで確認

お値段が↓こんな感じで、リーズナブルで運用できそうです。