InfluxDBからデータを取ってくるコンポーネントを作ってみる
前回
InfluxDBへデータを入れるInfluxDBPutコンポーネントを作成しましたので、入れたデータを取得するInfluxDBGetコンポーネントも作成したいと思います。
テスト環境
環境は前回と同じです。
contos上のInfluxDB1.7.4、データベース「asteria」を使用します。
コンポーネント
今回はInfluxDBからデータを取得するInfluxDBGetコンポーネントを作成します。
やりたいことは、
1、クエリーを実行して結果をレコードストリームとして出力する
2、データベースとMeasurementと保持ポリシーを指定して簡単にデータを取得できる
くらいです。
後は、timeで取得範囲を指定と、ページ用に開始行と取得数指定くらいでしょうか。
プロパティ
プロパティ名 | 説明 |
---|---|
コネクションを使用 | コネクションを使用しない場合は、URLとユーザー名とパスワードのプロパティで指定します。 |
コネクション名 | 専用コネクションを指定します。 |
データベース | 今回は作成済の「asteria」を指定します。 |
クエリー | 実行するクエリー文字列を指定します。 |
読み込み件数 | 取得したレコード数が格納されます。 |
Measurement | Measurementの名前を指定します。 |
保持ポリシー | 保持ポリシーを指定します。 |
time>= | 指定時刻以降のデータを取得します。時刻は文字列「例:2019-03-25T05:55:55.5Z」で指定します。 |
time<= | 指定時刻以前のデータを取得します。 |
読み込み開始行 | 対象のデータで取得する行番号を指定します。1000件のデータで100番目から100件取得するときなどに使用します。 |
取り出す件数 | 取得するデータ数を指定します。 |
ソース
ソースのメイン部分は以下の感じになります。
専用コネクションからorg.influxdb.InfluxDBオブジェクトを受け取ります
_influxDB = InfluxDBConnection.createInfluxDB(url, username, password);
クエリー文字列を作っている部分
String query = _query.strValue();
if (!StringUtil.isEmpty(query)) {
//クエリープロパティを指定している場合はそのまま実行
} else {
//クエリー文を作成
String retentionPolicy = _retentionPolicy.strValue();
String measurement = _measurement.strValue();
String database = _database.strValue();
if (StringUtil.isEmpty(database)) {
throw new ComponentExceptionByMessageCode(this, ERROR_CODE_DATABASE);
}
StringBuilder queryBuilder = new StringBuilder("select * from " + database);
if (StringUtil.isEmpty(retentionPolicy)) {
throw new ComponentExceptionByMessageCode(this, ERROR_CODE_RETENTION_POLICY_EMPTY);
}
queryBuilder.append(".\"").append(retentionPolicy).append("\"");
if (StringUtil.isEmpty(measurement)) {
throw new ComponentExceptionByMessageCode(this, ERROR_CODE_MEASUREMENT_EMPTY);
}
queryBuilder.append(".\"").append(measurement).append("\"");
String ge = _greaterThanOrEqualTo.strValue();
String le = _lessThanOrEqualTo.strValue();
if (!StringUtil.isEmpty(ge) || !StringUtil.isEmpty(le)) {
queryBuilder.append(" where ");
if (!StringUtil.isEmpty(ge)) {
queryBuilder.append("time >= '").append(ge).append("'");
if (!StringUtil.isEmpty(le)) {
queryBuilder.append(" and time <= '").append(le).append("'");
}
} else {
queryBuilder.append("time <= '").append(le).append("'");
}
}
if (_recordFilter.booleanValue()) {
int limit = _getCount.intValue();
int offset = _startRow.intValue();
if (limit > 0) {
queryBuilder.append(" limit ").append(limit);
}
if (offset > 1) {
queryBuilder.append(" offset ").append(offset);
}
}
query = queryBuilder.toString();
}
クエリーを実行してレスポンスを取得する部分
private List<List<Object>> getQueryResponse(String queryString) {
Query query = new Query(queryString);
QueryResult result = _influxDB.query(query);
List<Result> results = result.getResults();
for (Result res : results) {
List<Series> series = res.getSeries();
if (series != null) {
for (Series se : series) {
_columns = se.getColumns();
List<List<Object>> values = se.getValues();
return values;
}
}
}
return null;
}
実行
テストデータを全件取得するフロー、はじめの5件のみ取得するフローなどで動作を確認してみました。
全件(7458レコード)取得時の結果です。
278msでした。
まとめ
今回はFlowServiceのログでテストしたのですが、各種センサーデータなどを格納していろいろな角度からグルーピングしたりしてデータを取得するのかなぁと使用方法を考えてみたりしました。InfluxDBは結構使いどころがありそうです。
Author And Source
この問題について(InfluxDBからデータを取ってくるコンポーネントを作ってみる), 我々は、より多くの情報をここで見つけました https://qiita.com/ArimitsuIshii/items/fc36b8346bb5893d69fe著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .