InfluxDB使用チュートリアル:Java版InfluxDBツールクラス

11867 ワード

前言:
前述したように、LinuxとWindowsでのInfluxの使用を紹介した後、本節ではJavaでのInfluxの使用を紹介し、まずInfluxDB Java APIパッケージのツールクラスを提供し、直接使用するのに便利です.
1.InfluxDBツールクラス
まずツールクラスを差し上げ、次に使い方を紹介します.
package com.common.utils.influxdb;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

/**
 * InfluxDB        
 * 
 * @author     
 */
public class InfluxDBConnection {

	//    
	private String username;
	//   
	private String password;
	//     
	private String openurl;
	//    
	private String database;
	//     
	private String retentionPolicy;

	private InfluxDB influxDB;

	public InfluxDBConnection(String username, String password, String openurl, String database,
			String retentionPolicy) {
		this.username = username;
		this.password = password;
		this.openurl = openurl;
		this.database = database;
		this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
		influxDbBuild();
	}

	/**
	 *      
	 * 
	 * @param dbName
	 */
	@SuppressWarnings("deprecation")
	public void createDB(String dbName) {
		influxDB.createDatabase(dbName);
	}

	/**
	 *      
	 * 
	 * @param dbName
	 */
	@SuppressWarnings("deprecation")
	public void deleteDB(String dbName) {
		influxDB.deleteDatabase(dbName);
	}

	/**
	 *         
	 * 
	 * @return true   
	 */
	public boolean ping() {
		boolean isConnected = false;
		Pong pong;
		try {
			pong = influxDB.ping();
			if (pong != null) {
				isConnected = true;
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return isConnected;
	}

	/**
	 *         ,       
	 * 
	 * @return
	 */
	public InfluxDB influxDbBuild() {
		if (influxDB == null) {
			influxDB = InfluxDBFactory.connect(openurl, username, password);
		}
		try {
			// if (!influxDB.databaseExists(database)) {
			// influxDB.createDatabase(database);
			// }
		} catch (Exception e) {
			//             ,        
			// e.printStackTrace();
		} finally {
			influxDB.setRetentionPolicy(retentionPolicy);
		}
		influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
		return influxDB;
	}

	/**
	 *          
	 * 
	 * @param policyName
	 *               
	 * @param duration
	 *                
	 * @param replication
	 *                  
	 * @param isDefault
	 *                      
	 */
	public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {
		String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
				database, duration, replication);
		if (isDefault) {
			sql = sql + " DEFAULT";
		}
		this.query(sql);
	}

	/**
	 *          
	 * 
	 * @param    :default,    :30 ,      :1
	 *                    
	 */
	public void createDefaultRetentionPolicy() {
		String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
				"default", database, "30d", 1);
		this.query(command);
	}

	/**
	 *   
	 * 
	 * @param command
	 *                
	 * @return
	 */
	public QueryResult query(String command) {
		return influxDB.query(new Query(command, database));
	}

	/**
	 *   
	 * 
	 * @param measurement
	 *             
	 * @param tags
	 *              
	 * @param fields
	 *              
	 */
	public void insert(String measurement, Map tags, Map fields, long time,
			TimeUnit timeUnit) {
		Builder builder = Point.measurement(measurement);
		builder.tag(tags);
		builder.fields(fields);
		if (0 != time) {
			builder.time(time, timeUnit);
		}
		influxDB.write(database, retentionPolicy, builder.build());
	}

	/**
	 *       
	 * 
	 * @param batchPoints
	 */
	public void batchInsert(BatchPoints batchPoints) {
		influxDB.write(batchPoints);
		// influxDB.enableGzip();
		// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);
		// influxDB.disableGzip();
		// influxDB.disableBatch();
	}

	/**
	 *       
	 * 
	 * @param database
	 *               
	 * @param retentionPolicy
	 *                
	 * @param consistency
	 *               
	 * @param records
	 *                  (  BatchPoints.lineProtocol()     record)
	 */
	public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
			final List records) {
		influxDB.write(database, retentionPolicy, consistency, records);
	}

	/**
	 *   
	 * 
	 * @param command
	 *                
	 * @return       
	 */
	public String deleteMeasurementData(String command) {
		QueryResult result = influxDB.query(new Query(command, database));
		return result.getError();
	}

	/**
	 *      
	 */
	public void close() {
		influxDB.close();
	}

	/**
	 *   Point
	 * 
	 * @param measurement
	 * @param time
	 * @param fields
	 * @return
	 */
	public Point pointBuilder(String measurement, long time, Map tags, Map fields) {
		Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();
		return point;
	}

}



依存Jarパッケージ:

		org.influxdb
		influxdb-java
		2.10


二.ツールクラスを使用してデータを照会する
InfluxDBは一度に複数のSQLを検索することをサポートし、SQL間はセミコロン;で区切ることができます.次に、SQLが1つしかない場合、クエリが返す結果セットをどのように解析するかを示します.
public static void main(String[] args) {
		InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
		QueryResult results = influxDBConnection
				.query("SELECT * FROM measurement where name = '    '  order by time desc limit 1000");
		//results.getResults()       SQL      ,        SQL,            。
		Result oneResult = results.getResults().get(0);
		if (oneResult.getSeries() != null) {
			List> valueList = oneResult.getSeries().stream().map(Series::getValues)
					.collect(Collectors.toList()).get(0);
			if (valueList != null && valueList.size() > 0) {
				for (List value : valueList) {
					Map map = new HashMap();
					//       1  
					String field1 = value.get(0) == null ? null : value.get(0).toString();
					//       2  
					String field2 = value.get(1) == null ? null : value.get(1).toString();
					// TODO                ……
				}
			}
		}
	}

データを取得する際には、空の値判断に注意して、本例では、戻りデータを先に空oneResult.getSeries() != nullと判定し、次にoneResult.getSeries().getValues().get(0)を呼び出して最初のSQLの戻り結果セットを取得し、valueListを巡り、各レコードのターゲットフィールド値を取り出す.
InfluxDBパッケージの結果セットは少し深く、主に複数のSQLの使い捨てクエリーをサポートしているため、クエリーの速度を高めることができます.この場所はリレーショナル・データベースの使用とは異なります.
二.InfluxDBツール類を使用して、単一データを挿入する
InfluxDB     ,             ;tags      String ,      ,      。
	public static void main(String[] args) {
		InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
		Map tags = new HashMap();
		tags.put("tag1", "   ");
		Map fields = new HashMap();
		fields.put("field1", "String  ");
		//    ,InfluxDB     ,             
		fields.put("field2", 3.141592657);
		//          
		influxDBConnection.insert("  ", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
	}

三.InfluxDBツール類を使用して、大量にデータを書き込む二つの方式
注意:この2つの方法では、両方のデータが同じデータベースに書き込まれ、tagが同じである場合、tagが異なる場合は、異なるBatchPointオブジェクトに配置する必要があります.そうしないと、データの書き込みエラーが発生します.
方式1:BatchPointsでデータを組み立てた後、循環してデータベースを挿入する.
public static void main(String[] args) {
		InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
		Map tags = new HashMap();
		tags.put("tag1", "   ");
		Map fields1 = new HashMap();
		fields1.put("field1", "abc");
		//    ,InfluxDB     ,             
		fields1.put("field2", 123456);
		Map fields2 = new HashMap();
		fields2.put("field1", "String  ");
		fields2.put("field2", 3.141592657);
		//      
		Point point1 = influxDBConnection.pointBuilder("  ", System.currentTimeMillis(), tags, fields1);
		Point point2 = influxDBConnection.pointBuilder("  ", System.currentTimeMillis(), tags, fields2);
		//         batchPoints 
		BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", "   1").retentionPolicy("hour")
				.consistency(ConsistencyLevel.ALL).build();
		BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", "   2").retentionPolicy("hour")
				.consistency(ConsistencyLevel.ALL).build();
		batchPoints1.point(point1);
		batchPoints2.point(point2);
		//               
		influxDBConnection.batchInsert(batchPoints1);
		influxDBConnection.batchInsert(batchPoints2);
	}

方式2:BatchPointsでデータを組み立て、シーケンス化した後、一度にデータベースを挿入する.
public static void main(String[] args) {
		InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
		Map tags1 = new HashMap();
		tags1.put("tag1", "   ");
		Map tags2 = new HashMap();
		tags2.put("tag2", "   ");
		Map fields1 = new HashMap();
		fields1.put("field1", "abc");
		//    ,InfluxDB     ,             
		fields1.put("field2", 123456);
		Map fields2 = new HashMap();
		fields2.put("field1", "String  ");
		fields2.put("field2", 3.141592657);
		//      
		Point point1 = influxDBConnection.pointBuilder("  ", System.currentTimeMillis(), tags1, fields1);
		Point point2 = influxDBConnection.pointBuilder("  ", System.currentTimeMillis(), tags2, fields2);
		BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", "   1")
				.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
		//         batchPoints 
		batchPoints1.point(point1);
		BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", "   2")
				.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
		//         batchPoints 
		batchPoints2.point(point2);
		//     batchPoints    ,        ,      
		List records = new ArrayList();
		records.add(batchPoints1.lineProtocol());
		records.add(batchPoints2.lineProtocol());
		//               
		influxDBConnection.batchInsert("db-test", "hour", ConsistencyLevel.ALL, records);
	}

第2の方法では、1つのデータベースに属するデータを一括して書き込むことができ、書き込み速度が最も速いことをお勧めします.
まとめ:
InfluxのJavaでの読み取りと書き込み、そして大量にデータを書き込む2つの方法について、今日は皆さんに紹介しました.このツールクラスとケースが役に立つことを願っています.