时序数据库:Influxdb详解
文章目录
一、简介
1、简介
关系型数据库也是支持时间戳的,也能够基于时间戳进行查询。但是,从我们的使用场景出发,需要注意数据库的写入性能。通常,关系型数据库会采用 B+树数据结构,在数据写入时,有可能会触发叶裂变,从而产生了对磁盘的随机读写,降低写入速度。
当前市面上的时序数据库通常都是采用 LSM Tree 的变种,顺序写磁盘来增强数据的写入能力。通常时序数据库都会保证在单点每秒数十万的写入能力
。
时序数据库一般用于指标监控场景
。这个场景的数据有一个非常明显的特点就是冷热差别明显。通常,指标监控只会使用近期一段时间的数据。
2、官网
官网:https://www.influxdata.com/
二、部署
1、安装
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.4.0-linux-amd64.tar.gz
# 解压
tar -zxvf influxdb2-2.4.0-linux-amd64.tar.gz
# 启动
./influxd
2、配置
(1)用户初始化
1、访问http://192.168.56.10:8086/
2、点击GET STARTED后,需要填写用户名、密码、组织和初始化的数据库(Bucket)
admin/admin123
下一步之后,点击CONFIGURE LATER
3、进入到初始化页面
三、入门(Web UI)
1、加载数据
(1)上传数据文件
在 Web UI 上,你可以用文件的方式上传数据,前提是文件中的数据符合 InfluxDB 支持的类型,包括 CSV、带 Flux 注释的 CSV 和 InfluxDB 行协议。
点开一个文件上传的页面,会有对应的示例:
(2)代码接入模板
在这里提供了一些代码相关的Demo,非常方便:
2、管理存储桶
可以将 InfluxDB 中的 bucket 理解为普通关系型数据库中的 database。
(1)创建桶
创建桶,可以选择自动删除超过多长时间的数据。
(2)修改桶
可以直接修改桶的数据过期时间。
修改桶的名称是敏感行为,谨慎操作!
3、演示:测试录入数据
4、Telegraf集成
Telegraf 是 InfluxDB 生态中的一个数据采集组件,它可以将各种时序数据自动采集到InfluxDB。现在,Telegraf 不仅仅是 InfluxDB 的数据采集组件了,很多时序数据库都支持与 Telegraf 进行协作,不少类似的时序数据收集组件选择在 Telegraf 的基础上二次开发。
这里先略过了。
5、管理抓取任务
抓取任务就是你给定一个 URL,InfluxDB 每隔一段时间去访问这个链接,把访问到的数据入库。
目标 URL 暴露出来的数据格式必须得是 Prometheus 数据格式
。
此处略了。
6、管理 API Token
token用于API的使用。
7、使用Data Explorer
(1)查询构造器
(2)FLUX 脚本编辑器
可以手动将查询构造器切换为 FLUX 脚本编辑器
。
(3)数据预览区
数据预览区可以将你的数据展示出来。下图是一个效果图。
(4)数据导出为CSV
8、使用Notebook
四、Flux语法
暂略。
五、HTTP API
使用API时,需要在Header头携带Authorization
,value值为Token空格 + 你的token
# 查询目前所有的 Token 信息,包括他们拥有什么权限
http://192.168.56.10:8086/api/v2/authorizations
官方文档:https://docs.influxdata.com/influxdb/v2/api/
六、Java操作Influxdb
客户端参考:https://github.com/influxdata/influxdb-client-java
1、导包
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.5.0</version>
</dependency>
2、POJO
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import java.time.Instant;
/**
* @description: 这是一个POJO类,对应将POJO类写入InfluxDB
*/
@Measurement(name="temperature")
public class DemoPOJO {
/** 注意类上的@Measurement注解,我们既可以使用Measurement注解来指定一个POJO类的测量名称
* 但是使用@Measurement注解会将测量名称直接写死在代码里
* 当测量名称会在运行时发生改变时,我们可以使用@Column(menasurement=true)注解
* 这样会将POJO类中被注解的值作为测量名称。
* **/
//@Column(measurement = true)
//String measurementName;
/** 相当于InfluxDB行协议中的标签集,此处标签的名称将会是location **/
@Column(tag = true)
String location;
/** 相当于InfluxDB行协议中的字段集,此处字段的名称将会是value **/
@Column
Double value;
/** 相当于InfluxDB行协议中的时间戳 **/
@Column(timestamp = true)
Instant timestamp;
/** 全参构造器,方便调用者创建对象 **/
public DemoPOJO(String location, Double value, Instant timestamp) {
this.location = location;
this.value = value;
this.timestamp = timestamp;
}
}
3、同步写入
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.internal.AbstractWriteClient;
import com.influxdb.client.internal.MeasurementMapper;
import com.influxdb.client.write.Point;
import java.time.Instant;
/**
* @author: tony
* @description: 这里是使用同步方式向InfluxDB写入数据的示例代码
*
*/
public class Write1 {
/** token 操作时需要换成自己的 **/
private static char[] token = "token....".toCharArray();
/** 组织名称 **/
private static String org = "cxf";
/** InfluxDB服务提供的url **/
private static String url = "http://localhost:8086/";
/** 存储桶名称 **/
private static String bucket = "example_java";
public static void main(String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();
// 0. 使用InflxuDB行协议写入
// writeApiBlocking.writeRecord(WritePrecision.MS,"temperature,location=north value=50");
// 1. 使用Point写入
// Point point = Point.measurement("temperature")
// .addTag("location", "west")
// .addField("value", 38.0)
// .time(Instant.now(),WritePrecision.NS)
// ;
// writeApiBlocking.writePoint(point);
// 2. 使用POJO类写入
DemoPOJO demoPOJO = new DemoPOJO("east", 22.2, Instant.now());
writeApiBlocking.writeMeasurement(WritePrecision.MS,demoPOJO);
// 3. 调用close方法会关闭并释放一些比如守护线程之类的对象。
influxDBClient.close();
}
}
4、异步写入
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
/**
* @description: 通过异步的方式向InfluxDB写入数据
*/
public class AsyncWrite {
/** token 操作时需要换成自己的 **/
private static char[] token = "token....".toCharArray();
/** 组织名称 **/
private static String org = "cxf";
/** InfluxDB服务提供的url **/
private static String url = "http://localhost:8086/";
/** 存储桶名称 **/
private static String bucket = "example_java";
public static void main(String[] args) {
// 0.创建InfluxDB的客户端
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
// 1.异步写入会创建一个守护线程,所以在makWriteApi时可以传递一些配置项,也就是WriteOptions对象
WriteOptions options = WriteOptions.builder()
.batchSize(999)
.flushInterval(10000)
.build();
// 2.使用makeWriteApi创建的
WriteApi writeApi = influxDBClient.makeWriteApi(options);
for (int i = 0; i < 999; i++) {
writeApi.writeRecord(WritePrecision.MS,"temperature,location=south value=77");
}
// 3.关闭连接,此方法会触发一次刷写,将缓冲区中剩下的数据向InfluxDB写入一次。
influxDBClient.close();
}
}
5、查询
import com.influxdb.client.BucketsApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.File;
import com.influxdb.query.FluxColumn;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import java.util.List;
import java.util.Map;
/**
* @description: 这是关于从InfluxDB查询数据的代码
*/
public class Query {
/** token 操作时需要换成自己的 **/
private static char[] token = "token....".toCharArray();
/** 组织名称 **/
private static String org = "cxf";
/** InfluxDB服务提供的url **/
private static String url = "http://localhost:8086/";
public static void main(String[] args) {
// 0.创建InfluxDB客户端对象
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org);
// 1.获取查询API对象
QueryApi queryApi = influxDBClient.getQueryApi();
// 2.这个flux脚本会查询test_init存储桶中的go_goroutines测量,这个测量下只有一个序列
String flux = "from(bucket: \"test_init\")\n" +
" |> range(start: -1h)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"go_goroutines\")\n" +
" |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
" |> yield(name: \"mean\")";
// 3.这个flux脚本会查询example02存储桶中的cpu测量,指定字段名称为usage_user后,
String flux2 = "from(bucket: \"example02\")\n" +
" |> range(start: -1h)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n" +
" |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n" +
" |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
" |> yield(name: \"mean\")";
// 4.使用query方法将会得到一个List<FluxTable>对象,其中每一个FluxTable都对应着一个序列
List<FluxTable> query = queryApi.query(flux);
// 5.下面这个for循环会把遍历每个序列,并将这个序列中对应的每一行数据打印出来。
for (FluxTable fluxTable : query) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord record : records) {
Map<String, Object> one = record.getValues();
System.out.println(one);
}
}
// 6.下面的queryRaw方法将会得到一个字符串,字符串中是FLUX拓展的CSV格式的数据
String data = queryApi.queryRaw(flux2);
System.out.println(data);
}
}