PartiQLとMongoDBを接続して遊んでみた


前提条件

本記事は、PartiQLについて調べた内容について記載しています。内容に調査漏れ等の不備があるかと思いますので、その場合はEditRequestなりコメントなりでご指摘いただければ幸いです。
またこの記事は2019/11/7時点で書かれたものになります。PartiQLはまだ発表されたばかりで、これから実装がどんどん進み、記載内容がふさわしくなくなることもあると思いますので、その点はご容赦ください

PartiQLとは

PartiQLはAmazonがOpenSourceとして公開した、SQL互換言語です。特徴として、制御対象のDB形態を幅広く持てることができ、RDB以外にJSONのようにネスト化されたデータも扱えるようです。

実際の操作については、Tutorialがあるので、そちらをご参照ください。結構SQLを知っている方は直感的に操作できるかと思います。現時点では、Update/delete/Insert文は実装されていないようです。

MongoDBと接続してみた。

ここからが本題です。このPartiQLはサービスにおいて検索機能や統計処理機能を加える際に、API用のクエリ言語として独自で言語を追加する代わりにPartQLを使用(Splunkで言うSPL)するユースケースがありそうです。そこでまだアルファ版ではありますが、MongoDBと接続して実際に動かしてみました。

まずPartiQLのインストールですが、公式サイトのGetting Startを見れば、特に問題なくインストール可能です。また前提としてJREが必要になります。こちらはOpenJRE9で動作確認しています。

コマンドを実行するとREPL形式で、ユーザの入力をインタラクティブに実行する形でPartiQLを試すことができます。

$ ./bin/partiql
Welcome to the PartiQL REPL!
PartiQL> 

しかし、これ以上の実装は現段階ではされていないようで、MongoDBとのConnectorも特に用意されていません。

そこで、本記事ではPythonを使って、ユーザからのPartiQL入力クエリ、およびPymongoモジュールにてDBから取得したデータを引数として、Subprocessモジュールからpartiql実行した結果を出力するPoCの作成を実施します。

以下結果だけ知りたい方へ

以下が実際に試作したPoCです。MongoDBから情報を取って、PariQLクエリ検索掛けた結果を表示します。(特にエラーハンドリングは実施していません)

partiQLCtl.py
import subprocess
import json
import os
import pymongo

class PartiQLCtl():
    def __init__(self, address=None, port=None):
        # address, portの記載がない場合は、検索元データJSON直接入力による検索
        if (address is not None) and (port is not None):
            self.client = pymongo.MongoClient(address, port)
        else:
            self.client = None

    def load_json_data(self, json_data):
        # 検索元データJSON直接入力用の関数
        self.env_data = self.convert_json_to_partiql(json_data)
        print(self.env_data)

    def load_db_data(self, db, collection):
        # 検索元データをMongoDBから抽出
        cur = self.client[db][collection].find({})

        # DB出力結果はdata['record']以下にリストとして保存
        data = {"record": []}
        for record in cur:
            # _idのString処理
            record['_id'] = str(record['_id'])
            data["record"].append(record)

        self.env_data = self.convert_json_to_partiql(data)

    def execute(self, query):
        # クエリの実行用関数

        # -iオプション用のダミーデータ。あらかじめファイルの作成が必要。中身は"{}"の2文字の記載のみ
        ion_file_path = os.path.join(os.path.dirname(__file__), '../tmp/tmp.ion')
        # -eオプション用の検索元データ。本スクリプト内でファイルを作成。必要に応じて場所変更
        env_file_path = os.path.join(os.path.dirname(__file__), '../tmp/env.ion')
        # 実行ファイル。必要に応じて場所変更
        partiql_execute_path = os.path.join(os.path.dirname(__file__),'../dist/partiql/bin/partiql')

        with open(env_file_path, 'w') as f:
            f.write(self.env_data)

        res = subprocess.check_output([partiql_execute_path, '-q', query, '-i', ion_file_path, '-e', env_file_path])
        return res

    def convert_json_to_partiql(self, json):
        # JSONをPartiQL用のデータ形式に変更する関数。list/dict/boolean/int/str型に対応
        if type(json) is list:
            env = "<<"
            for idx, elem in enumerate(json):
                if (type(elem) is dict) or (type(elem) is list):
                    env += (self.convert_json_to_partiql(elem))
                elif type(elem) == str:
                    env += "'{}'".format(elem)
                elif elem is None:
                    env += "null"
                elif elem is True:
                    env += "true"
                elif elem is False:
                    env += "false"
                else:
                    env += str(elem)

                if idx != len(json) - 1:
                    env += ', '

            env += '>>'
        elif type(json) is dict:
            env = '{'
            for idx, elem in enumerate(json.keys()):
                if (type(json[elem]) is dict) or (type(json[elem]) is list):
                    env += "'{}': {}".format(elem, self.convert_json_to_partiql(json[elem]))
                elif type(json[elem]) == str:
                    env += "'{}': '{}'".format(elem, json[elem])
                elif json[elem] is None:
                    env += "'{}': null".format(elem)
                elif json[elem] is True:
                    env += "'{}': true".format(elem)
                elif json[elem] is False:
                    env += "'{}': false".format(elem)
                else:
                    env += "'{}': {}".format(elem, str(json[elem]))

                if idx != len(json.keys()) - 1:
                    env += ', '
            env += '}'

        return env


if __name__ == '__main__':
    pql = PartiQLCtl("192.168.1.10", 27017)
    pql.load_db_data("test", "test")
    print(pql.execute("select r.id from record r"))

またMongoDBにあらかじめ下記のデータを格納します。

> use test
switched to db test
> db.test.insert({"id": "aa", "setting": [{"config1": "hoge", "config2": "fuga"}]})
WriteResult({ "nInserted" : 1 })
> db.test.insert({"id": "bb", "setting": [{"config1": "hoge2", "config2": "fuga2"}]})

以下実行結果になります。

$ python partiQlCtl.py 
b"<<\n  {\n    'id': 'aa'\n  },\n  {\n    'id': 'bb'\n  }\n>>"

最後のpql.executeの引数(PartiQlのクエリ)を変更することによって、表示データが変更されることを確認してみてください。

以下PoC作成までの試行錯誤

partiqlをのヘルプを実行すると下記の通り、REPL以外にも、コマンドとして実行することもできるようになっているようです。

$ ./bin/partiql -h
PartiQL CLI
Command line interface for executing PartiQL queries. Can be run in an interactive (REPL) mode or non-interactive.

Examples:
To run in REPL mode simply execute the executable without any arguments:
     partiql

In non-interactive mode we use Ion as the format for input data which is bound to a global variable 
named "input_data", in the example below /logs/log.ion is bound to "input_data":
     partiql --query="SELECT * FROM input_data" --input=/logs/log.ion

The cli can output using PartiQL syntax or Ion using the --output-format option, e.g. to output binary ion:
     partiql --query="SELECT * FROM input_data" --output-format=ION_BINARY --input=/logs/log.ion

To pipe input data in via stdin:
     cat /logs/log.ion | sqlcli --query="SELECT * FROM input_data" --format=ION_BINARY > output.10n

Option                                Description                                                
------                                -----------                                                
-e, --environment <File>              initial global environment (optional)                      
-h, --help                            prints this help                                           
-i, --input <File>                    input file, requires the query option (default: stdin)     
-o, --output <File>                   output file, requires the query option (default: stdout)   
--of, --output-format <OutputFormat:  output format, requires the query option (default: PARTIQL)
  (ION_TEXT|ION_BINARY|PARTIQL)>                                                                 
-q, --query <String>                  PartiQL query, triggers non interactive mode 

以下のコマンドを実行することにより、シェル上でクエリ検索を実行できそうです。

partiql --query="SELECT * FROM input_data" --input=/logs/log.ion

ここで--inputはIONフォーマットしか使えないと記載があります。ここで知らない方のために説明すると、IONフォーマットはAmazonによって作成されたデータシリアライズフォーマットであり、JSONライクなText形式の記述のほかにバイナリ形式で記述されています。
Pythonでのモジュールも提供されていますので、こちらを使うことでJSON⇔IONの変換が可能になります。

>>> import amazon.ion.simpleion as ion
>>> 
>>> obj = ion.loads('{"id" : "aa", "setting" : [ { "config1" : "hoge", "config2" : "fuga" } ] }')
'$ion_1_0 {id:"aa",setting:[{config1:"hoge",config2:"fuga"}]}'

実際に出力結果をtest.ionとして保存し、実行すると下記の通りになります。

$ cat test.ion 
$ion_1_0 {id:"aa",setting:[{config1:"hoge",config2:"fuga"}]}
$ ./bin/partiql -q "select * from input_data" -i test.ion 
<<
  {
    'id': 'aa',
    'setting': [
      {
        'config1': 'hoge',
        'config2': 'fuga'
      }
    ]
  }
>>

しかしここで、いくつか試してみたところ、from句にネストしたデータを入力すると、出力結果が空出力されるという想定外の事象が発生しました。ここで、この手法を追求することはは断念しています。

$ ./bin/partiql -q "select * from input_data.setng" -i test.ion  
<<
  {}
>>

しかし、REPLの出力結果をPythonのSubprocess PopenとうでPIPEをつないでやり取りをするのは、あまりコードとしてきれいでないなどの理由で別の手法を検討します。

ここで入力の検索元の情報は-iオプション以外に-eオプションで投入する方法があります。(この場合は投入するデータはPartiQL独自形式のデータになるため、JSON⇔PariQL独自形式への変換が必要になります。)。
ただ、-qオプション、および-eオプションでは、標準入力からinput情報を待ち受けることになるため、実行すると、ユーザ入力待ち状態となります。
最終的にダミーとして-iオプションにダミーデータを読み込ませることで、当初の目的を達成しました。