InfluxDB使用チュートリアル:Java版InfluxDBツールクラス
11867 ワード
前言:
前述したように、LinuxとWindowsでのInfluxの使用を紹介した後、本節ではJavaでのInfluxの使用を紹介し、まずInfluxDB Java APIパッケージのツールクラスを提供し、直接使用するのに便利です.
1.InfluxDBツールクラス
まずツールクラスを差し上げ、次に使い方を紹介します.
依存Jarパッケージ:
二.ツールクラスを使用してデータを照会する
InfluxDBは一度に複数のSQLを検索することをサポートし、SQL間はセミコロン
データを取得する際には、空の値判断に注意して、本例では、戻りデータを先に空
InfluxDBパッケージの結果セットは少し深く、主に複数のSQLの使い捨てクエリーをサポートしているため、クエリーの速度を高めることができます.この場所はリレーショナル・データベースの使用とは異なります.
二.InfluxDBツール類を使用して、単一データを挿入する
三.InfluxDBツール類を使用して、大量にデータを書き込む二つの方式
注意:この2つの方法では、両方のデータが同じデータベースに書き込まれ、tagが同じである場合、tagが異なる場合は、異なる
方式1:BatchPointsでデータを組み立てた後、循環してデータベースを挿入する.
方式2:BatchPointsでデータを組み立て、シーケンス化した後、一度にデータベースを挿入する.
第2の方法では、1つのデータベースに属するデータを一括して書き込むことができ、書き込み速度が最も速いことをお勧めします.
まとめ:
InfluxのJavaでの読み取りと書き込み、そして大量にデータを書き込む2つの方法について、今日は皆さんに紹介しました.このツールクラスとケースが役に立つことを願っています.
前述したように、LinuxとWindowsでのInfluxの使用を紹介した後、本節ではJavaでのInfluxの使用を紹介し、まずInfluxDB Java APIパッケージのツールクラスを提供し、直接使用するのに便利です.
1.InfluxDBツールクラス
まずツールクラスを差し上げ、次に使い方を紹介します.
package com.common.utils.influxdb;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
/**
* InfluxDB
*
* @author
*/
public class InfluxDBConnection {
//
private String username;
//
private String password;
//
private String openurl;
//
private String database;
//
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxDBConnection(String username, String password, String openurl, String database,
String retentionPolicy) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}
/**
*
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}
/**
*
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}
/**
*
*
* @return true
*/
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}
/**
* ,
*
* @return
*/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password);
}
try {
// if (!influxDB.databaseExists(database)) {
// influxDB.createDatabase(database);
// }
} catch (Exception e) {
// ,
// e.printStackTrace();
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
return influxDB;
}
/**
*
*
* @param policyName
*
* @param duration
*
* @param replication
*
* @param isDefault
*
*/
public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
database, duration, replication);
if (isDefault) {
sql = sql + " DEFAULT";
}
this.query(sql);
}
/**
*
*
* @param :default, :30 , :1
*
*/
public void createDefaultRetentionPolicy() {
String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"default", database, "30d", 1);
this.query(command);
}
/**
*
*
* @param command
*
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}
/**
*
*
* @param measurement
*
* @param tags
*
* @param fields
*
*/
public void insert(String measurement, Map tags, Map fields, long time,
TimeUnit timeUnit) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
if (0 != time) {
builder.time(time, timeUnit);
}
influxDB.write(database, retentionPolicy, builder.build());
}
/**
*
*
* @param batchPoints
*/
public void batchInsert(BatchPoints batchPoints) {
influxDB.write(batchPoints);
// influxDB.enableGzip();
// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);
// influxDB.disableGzip();
// influxDB.disableBatch();
}
/**
*
*
* @param database
*
* @param retentionPolicy
*
* @param consistency
*
* @param records
* ( BatchPoints.lineProtocol() record)
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
final List records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/**
*
*
* @param command
*
* @return
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, database));
return result.getError();
}
/**
*
*/
public void close() {
influxDB.close();
}
/**
* Point
*
* @param measurement
* @param time
* @param fields
* @return
*/
public Point pointBuilder(String measurement, long time, Map tags, Map fields) {
Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();
return point;
}
}
依存Jarパッケージ:
org.influxdb
influxdb-java
2.10
二.ツールクラスを使用してデータを照会する
InfluxDBは一度に複数のSQLを検索することをサポートし、SQL間はセミコロン
;
で区切ることができます.次に、SQLが1つしかない場合、クエリが返す結果セットをどのように解析するかを示します.public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
QueryResult results = influxDBConnection
.query("SELECT * FROM measurement where name = ' ' order by time desc limit 1000");
//results.getResults() SQL , SQL, 。
Result oneResult = results.getResults().get(0);
if (oneResult.getSeries() != null) {
List> valueList = oneResult.getSeries().stream().map(Series::getValues)
.collect(Collectors.toList()).get(0);
if (valueList != null && valueList.size() > 0) {
for (List
データを取得する際には、空の値判断に注意して、本例では、戻りデータを先に空
oneResult.getSeries() != null
と判定し、次にoneResult.getSeries().getValues().get(0)
を呼び出して最初のSQLの戻り結果セットを取得し、valueListを巡り、各レコードのターゲットフィールド値を取り出す.InfluxDBパッケージの結果セットは少し深く、主に複数のSQLの使い捨てクエリーをサポートしているため、クエリーの速度を高めることができます.この場所はリレーショナル・データベースの使用とは異なります.
二.InfluxDBツール類を使用して、単一データを挿入する
InfluxDB , ;tags String , , 。
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map tags = new HashMap();
tags.put("tag1", " ");
Map fields = new HashMap();
fields.put("field1", "String ");
// ,InfluxDB ,
fields.put("field2", 3.141592657);
//
influxDBConnection.insert(" ", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
三.InfluxDBツール類を使用して、大量にデータを書き込む二つの方式
注意:この2つの方法では、両方のデータが同じデータベースに書き込まれ、tagが同じである場合、tagが異なる場合は、異なる
BatchPoint
オブジェクトに配置する必要があります.そうしないと、データの書き込みエラーが発生します.方式1:BatchPointsでデータを組み立てた後、循環してデータベースを挿入する.
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map tags = new HashMap();
tags.put("tag1", " ");
Map fields1 = new HashMap();
fields1.put("field1", "abc");
// ,InfluxDB ,
fields1.put("field2", 123456);
Map fields2 = new HashMap();
fields2.put("field1", "String ");
fields2.put("field2", 3.141592657);
//
Point point1 = influxDBConnection.pointBuilder(" ", System.currentTimeMillis(), tags, fields1);
Point point2 = influxDBConnection.pointBuilder(" ", System.currentTimeMillis(), tags, fields2);
// batchPoints
BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", " 1").retentionPolicy("hour")
.consistency(ConsistencyLevel.ALL).build();
BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", " 2").retentionPolicy("hour")
.consistency(ConsistencyLevel.ALL).build();
batchPoints1.point(point1);
batchPoints2.point(point2);
//
influxDBConnection.batchInsert(batchPoints1);
influxDBConnection.batchInsert(batchPoints2);
}
方式2:BatchPointsでデータを組み立て、シーケンス化した後、一度にデータベースを挿入する.
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map tags1 = new HashMap();
tags1.put("tag1", " ");
Map tags2 = new HashMap();
tags2.put("tag2", " ");
Map fields1 = new HashMap();
fields1.put("field1", "abc");
// ,InfluxDB ,
fields1.put("field2", 123456);
Map fields2 = new HashMap();
fields2.put("field1", "String ");
fields2.put("field2", 3.141592657);
//
Point point1 = influxDBConnection.pointBuilder(" ", System.currentTimeMillis(), tags1, fields1);
Point point2 = influxDBConnection.pointBuilder(" ", System.currentTimeMillis(), tags2, fields2);
BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", " 1")
.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
// batchPoints
batchPoints1.point(point1);
BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", " 2")
.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
// batchPoints
batchPoints2.point(point2);
// batchPoints , ,
List records = new ArrayList();
records.add(batchPoints1.lineProtocol());
records.add(batchPoints2.lineProtocol());
//
influxDBConnection.batchInsert("db-test", "hour", ConsistencyLevel.ALL, records);
}
第2の方法では、1つのデータベースに属するデータを一括して書き込むことができ、書き込み速度が最も速いことをお勧めします.
まとめ:
InfluxのJavaでの読み取りと書き込み、そして大量にデータを書き込む2つの方法について、今日は皆さんに紹介しました.このツールクラスとケースが役に立つことを願っています.