InfluxDBからデータを取ってくるコンポーネントを作ってみる


前回

InfluxDBへデータを入れるInfluxDBPutコンポーネントを作成しましたので、入れたデータを取得するInfluxDBGetコンポーネントも作成したいと思います。

テスト環境

環境は前回と同じです。
contos上のInfluxDB1.7.4、データベース「asteria」を使用します。

コンポーネント

今回はInfluxDBからデータを取得するInfluxDBGetコンポーネントを作成します。

やりたいことは、

1、クエリーを実行して結果をレコードストリームとして出力する
2、データベースとMeasurementと保持ポリシーを指定して簡単にデータを取得できる

くらいです。
後は、timeで取得範囲を指定と、ページ用に開始行と取得数指定くらいでしょうか。

プロパティ

プロパティ名 説明
コネクションを使用 コネクションを使用しない場合は、URLとユーザー名とパスワードのプロパティで指定します。
コネクション名 専用コネクションを指定します。
データベース 今回は作成済の「asteria」を指定します。
クエリー 実行するクエリー文字列を指定します。
読み込み件数 取得したレコード数が格納されます。
Measurement Measurementの名前を指定します。
保持ポリシー 保持ポリシーを指定します。
time>= 指定時刻以降のデータを取得します。時刻は文字列「例:2019-03-25T05:55:55.5Z」で指定します。
time<= 指定時刻以前のデータを取得します。
読み込み開始行 対象のデータで取得する行番号を指定します。1000件のデータで100番目から100件取得するときなどに使用します。
取り出す件数 取得するデータ数を指定します。

ソース

ソースのメイン部分は以下の感じになります。

専用コネクションからorg.influxdb.InfluxDBオブジェクトを受け取ります

_influxDB = InfluxDBConnection.createInfluxDB(url, username, password);

クエリー文字列を作っている部分

String query = _query.strValue();
if (!StringUtil.isEmpty(query)) {
    //クエリープロパティを指定している場合はそのまま実行
} else {
    //クエリー文を作成
    String retentionPolicy = _retentionPolicy.strValue();
    String measurement = _measurement.strValue();
    String database = _database.strValue();
    if (StringUtil.isEmpty(database)) {
        throw new ComponentExceptionByMessageCode(this, ERROR_CODE_DATABASE);
    }
    StringBuilder queryBuilder = new StringBuilder("select * from " + database);
    if (StringUtil.isEmpty(retentionPolicy)) {
        throw new ComponentExceptionByMessageCode(this, ERROR_CODE_RETENTION_POLICY_EMPTY);
    }
    queryBuilder.append(".\"").append(retentionPolicy).append("\"");
    if (StringUtil.isEmpty(measurement)) {
        throw new ComponentExceptionByMessageCode(this, ERROR_CODE_MEASUREMENT_EMPTY);
    }
    queryBuilder.append(".\"").append(measurement).append("\"");
    String ge = _greaterThanOrEqualTo.strValue();
    String le = _lessThanOrEqualTo.strValue();
    if (!StringUtil.isEmpty(ge) || !StringUtil.isEmpty(le)) {
        queryBuilder.append(" where ");
        if (!StringUtil.isEmpty(ge)) {
            queryBuilder.append("time >= '").append(ge).append("'");
            if (!StringUtil.isEmpty(le)) {
                queryBuilder.append(" and time <= '").append(le).append("'");
            }
        } else {
            queryBuilder.append("time <= '").append(le).append("'");
        }
    }
    if (_recordFilter.booleanValue()) {
        int limit = _getCount.intValue();
        int offset = _startRow.intValue();
        if (limit > 0) {
            queryBuilder.append(" limit ").append(limit);
        }
        if (offset > 1) {
            queryBuilder.append(" offset ").append(offset);
        }
    }
    query = queryBuilder.toString();
}

クエリーを実行してレスポンスを取得する部分

private List<List<Object>> getQueryResponse(String queryString) {
    Query query = new Query(queryString);
    QueryResult result = _influxDB.query(query);
    List<Result> results = result.getResults();
    for (Result res : results) {
        List<Series> series = res.getSeries();
        if (series != null) {
            for (Series se : series) {
                _columns = se.getColumns();
                List<List<Object>> values = se.getValues();
                return values;
            }
        }
    }
    return null;
}

実行

テストデータを全件取得するフロー、はじめの5件のみ取得するフローなどで動作を確認してみました。

全件(7458レコード)取得時の結果です。
278msでした。

◆1000件目から5件取得するフローの設定例

◆指定時刻間のもののみ取得するフローの設定例

まとめ

今回はFlowServiceのログでテストしたのですが、各種センサーデータなどを格納していろいろな角度からグルーピングしたりしてデータを取得するのかなぁと使用方法を考えてみたりしました。InfluxDBは結構使いどころがありそうです。