InfluxDB写入测试
早几年测试时序库时,采集数据到kafka,然后用不同数据进行存储验证。Influxdb是花时间比较多的,它的数据建模方法、读写方法都需要使用特殊的API。时间久了自己也经常忘记,把当时的测试关键代码记录下来,也方便日后查找。
代码基于java编写。
1、接口数据定义,clientid+tag组合必须唯一
public class KafkaInfo{
//客户端id
public String clientid;
//测点名称
public String tag ;
//数据时戳
public String ts ;
//测点数据类型
public String vt;
//测点值
public String value;
//本次数据更新次数
public long updatacount;
//测点说明
public String desc;
}
2、数据写入
static Logger logger = Logger.getLogger(influxdbApplicationTest.class);
// You can generate a Token from the "Tokens Tab" in the UI
static String token = "web界面创建的token";
static String bucket = "数据分库名";
static String org = "初始化时创建的org";
public static void main(String[] args) {
logger.info("-------start ,ApplicationTest--------");
InfluxDBClient client = InfluxDBClientFactory.create("http://10.126.12.113:8086", token.toCharArray());
//
// Write data
//
try (WriteApi writeApi = client.getWriteApi()) {
//
// Write by Data Point
//
for ( int i = 0;i < 10; i ++ ) {
Point point = Point.measurement("Line123")
.addTag("tag", "Tags001")
.addTag("L1", "real")
.addField("value", 20*i)
.addField("update", 7000+i)
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
//writeApi.writePoint(bucket, org, point);
}
}
//
// Query data
//
String query = String.format("from(bucket: \"%s\") |> range(start: -1h) |> filter(fn: (r) =>r._measurement==\"Line123\"", bucket);
QueryApi queryApi = client.getQueryApi();
List<FluxTable> tables = queryApi.query(query,org);
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getMeasurement()+","+ fluxRecord.getValueByKey("tag") +","+ fluxRecord.getValueByKey("_value"));
}
}
client.close();
logger.info("-------finish ,ApplicationTest--------");
}