当前位置: 首页 > article >正文

时序数据库:Influxdb详解

文章目录

  • 一、简介
    • 1、简介
    • 2、官网
  • 二、部署
    • 1、安装
    • 2、配置
      • (1)用户初始化
  • 三、入门(Web UI)
    • 1、加载数据
      • (1)上传数据文件
      • (2)代码接入模板
    • 2、管理存储桶
      • (1)创建桶
      • (2)修改桶
    • 3、演示:测试录入数据
    • 4、Telegraf集成
    • 5、管理抓取任务
    • 6、管理 API Token
    • 7、使用Data Explorer
      • (1)查询构造器
      • (2)FLUX 脚本编辑器
      • (3)数据预览区
      • (4)数据导出为CSV
    • 8、使用Notebook
  • 四、Flux语法
  • 五、HTTP API
  • 六、Java操作Influxdb
    • 1、导包
    • 2、POJO
    • 3、同步写入
    • 4、异步写入
    • 5、查询

一、简介

1、简介

关系型数据库也是支持时间戳的,也能够基于时间戳进行查询。但是,从我们的使用场景出发,需要注意数据库的写入性能。通常,关系型数据库会采用 B+树数据结构,在数据写入时,有可能会触发叶裂变,从而产生了对磁盘的随机读写,降低写入速度。

当前市面上的时序数据库通常都是采用 LSM Tree 的变种,顺序写磁盘来增强数据的写入能力。通常时序数据库都会保证在单点每秒数十万的写入能力

时序数据库一般用于指标监控场景。这个场景的数据有一个非常明显的特点就是冷热差别明显。通常,指标监控只会使用近期一段时间的数据。

2、官网

官网:https://www.influxdata.com/

二、部署

1、安装

wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.4.0-linux-amd64.tar.gz
# 解压
tar -zxvf influxdb2-2.4.0-linux-amd64.tar.gz

# 启动
./influxd

2、配置

(1)用户初始化

1、访问http://192.168.56.10:8086/
在这里插入图片描述
2、点击GET STARTED后,需要填写用户名、密码、组织和初始化的数据库(Bucket)
admin/admin123
在这里插入图片描述
下一步之后,点击CONFIGURE LATER

在这里插入图片描述

3、进入到初始化页面
在这里插入图片描述

三、入门(Web UI)

1、加载数据

(1)上传数据文件

在这里插入图片描述
在 Web UI 上,你可以用文件的方式上传数据,前提是文件中的数据符合 InfluxDB 支持的类型,包括 CSV、带 Flux 注释的 CSV 和 InfluxDB 行协议。
在这里插入图片描述
点开一个文件上传的页面,会有对应的示例:
在这里插入图片描述

(2)代码接入模板

在这里插入图片描述
在这里提供了一些代码相关的Demo,非常方便:

在这里插入图片描述

2、管理存储桶

可以将 InfluxDB 中的 bucket 理解为普通关系型数据库中的 database。

在这里插入图片描述

(1)创建桶

创建桶,可以选择自动删除超过多长时间的数据。
在这里插入图片描述

(2)修改桶

可以直接修改桶的数据过期时间。
修改桶的名称是敏感行为,谨慎操作!
在这里插入图片描述

3、演示:测试录入数据

在这里插入图片描述
在这里插入图片描述

4、Telegraf集成

Telegraf 是 InfluxDB 生态中的一个数据采集组件,它可以将各种时序数据自动采集到InfluxDB。现在,Telegraf 不仅仅是 InfluxDB 的数据采集组件了,很多时序数据库都支持与 Telegraf 进行协作,不少类似的时序数据收集组件选择在 Telegraf 的基础上二次开发。

这里先略过了。
在这里插入图片描述

5、管理抓取任务

抓取任务就是你给定一个 URL,InfluxDB 每隔一段时间去访问这个链接,把访问到的数据入库。

目标 URL 暴露出来的数据格式必须得是 Prometheus 数据格式

在这里插入图片描述
在这里插入图片描述
此处略了。

6、管理 API Token

token用于API的使用。
在这里插入图片描述

7、使用Data Explorer

在这里插入图片描述

(1)查询构造器

在这里插入图片描述

(2)FLUX 脚本编辑器

可以手动将查询构造器切换为 FLUX 脚本编辑器
在这里插入图片描述

(3)数据预览区

数据预览区可以将你的数据展示出来。下图是一个效果图。
在这里插入图片描述
在这里插入图片描述

(4)数据导出为CSV

在这里插入图片描述

8、使用Notebook

在这里插入图片描述
在这里插入图片描述

四、Flux语法

暂略。

五、HTTP API

使用API时,需要在Header头携带Authorization,value值为Token空格 + 你的token
在这里插入图片描述

# 查询目前所有的 Token 信息,包括他们拥有什么权限
http://192.168.56.10:8086/api/v2/authorizations

官方文档:https://docs.influxdata.com/influxdb/v2/api/

六、Java操作Influxdb

客户端参考:https://github.com/influxdata/influxdb-client-java

1、导包

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>6.5.0</version>
</dependency>

2、POJO


import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;

import java.time.Instant;

/**
 * @description: 这是一个POJO类,对应将POJO类写入InfluxDB
 */
@Measurement(name="temperature")
public class DemoPOJO {

    /** 注意类上的@Measurement注解,我们既可以使用Measurement注解来指定一个POJO类的测量名称
     * 但是使用@Measurement注解会将测量名称直接写死在代码里
     * 当测量名称会在运行时发生改变时,我们可以使用@Column(menasurement=true)注解
     * 这样会将POJO类中被注解的值作为测量名称。
     * **/
    //@Column(measurement = true)
    //String measurementName;

    /** 相当于InfluxDB行协议中的标签集,此处标签的名称将会是location **/
    @Column(tag = true)
    String location;

    /** 相当于InfluxDB行协议中的字段集,此处字段的名称将会是value **/
    @Column
    Double value;

    /** 相当于InfluxDB行协议中的时间戳 **/
    @Column(timestamp = true)
    Instant timestamp;

    /** 全参构造器,方便调用者创建对象 **/
    public DemoPOJO(String location, Double value, Instant timestamp) {
        this.location = location;
        this.value = value;
        this.timestamp = timestamp;
    }
}

3、同步写入

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.internal.AbstractWriteClient;
import com.influxdb.client.internal.MeasurementMapper;
import com.influxdb.client.write.Point;

import java.time.Instant;

/**
 * @author: tony
 * @description: 这里是使用同步方式向InfluxDB写入数据的示例代码
 *
 */
public class Write1 {

    /** token 操作时需要换成自己的 **/
    private static char[] token = "token....".toCharArray();

    /** 组织名称 **/
    private static String org = "cxf";

    /** InfluxDB服务提供的url **/
    private static String url = "http://localhost:8086/";

    /** 存储桶名称 **/
    private static String bucket = "example_java";

    public static void main(String[] args) {

        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
        WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();

        // 0. 使用InflxuDB行协议写入
//        writeApiBlocking.writeRecord(WritePrecision.MS,"temperature,location=north value=50");

        // 1. 使用Point写入
//        Point point = Point.measurement("temperature")
//                .addTag("location", "west")
//                .addField("value", 38.0)
//                .time(Instant.now(),WritePrecision.NS)
//                ;
//        writeApiBlocking.writePoint(point);

        // 2. 使用POJO类写入
        DemoPOJO demoPOJO = new DemoPOJO("east", 22.2, Instant.now());
        writeApiBlocking.writeMeasurement(WritePrecision.MS,demoPOJO);

        // 3. 调用close方法会关闭并释放一些比如守护线程之类的对象。
        influxDBClient.close();
    }
}

4、异步写入

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;

/**
 * @description: 通过异步的方式向InfluxDB写入数据
 */
public class AsyncWrite {

    /** token 操作时需要换成自己的 **/
    private static char[] token = "token....".toCharArray();

    /** 组织名称 **/
    private static String org = "cxf";

    /** InfluxDB服务提供的url **/
    private static String url = "http://localhost:8086/";

    /** 存储桶名称 **/
    private static String bucket = "example_java";

    public static void main(String[] args) {
        // 0.创建InfluxDB的客户端
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
        // 1.异步写入会创建一个守护线程,所以在makWriteApi时可以传递一些配置项,也就是WriteOptions对象
        WriteOptions options = WriteOptions.builder()
                .batchSize(999)
                .flushInterval(10000)
                .build();
        // 2.使用makeWriteApi创建的
        WriteApi writeApi = influxDBClient.makeWriteApi(options);
        for (int i = 0; i < 999; i++) {
            writeApi.writeRecord(WritePrecision.MS,"temperature,location=south value=77");
        }

        // 3.关闭连接,此方法会触发一次刷写,将缓冲区中剩下的数据向InfluxDB写入一次。
        influxDBClient.close();
    }
}

5、查询

import com.influxdb.client.BucketsApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.File;
import com.influxdb.query.FluxColumn;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;

import java.util.List;
import java.util.Map;

/**
 * @description: 这是关于从InfluxDB查询数据的代码
 */
public class Query {

    /** token 操作时需要换成自己的 **/
    private static char[] token = "token....".toCharArray();

    /** 组织名称 **/
    private static String org = "cxf";

    /** InfluxDB服务提供的url **/
    private static String url = "http://localhost:8086/";

    public static void main(String[] args) {

        // 0.创建InfluxDB客户端对象
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org);

        // 1.获取查询API对象
        QueryApi queryApi = influxDBClient.getQueryApi();

        // 2.这个flux脚本会查询test_init存储桶中的go_goroutines测量,这个测量下只有一个序列
        String flux = "from(bucket: \"test_init\")\n" +
                "  |> range(start: -1h)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"go_goroutines\")\n" +
                "  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
                "  |> yield(name: \"mean\")";

        // 3.这个flux脚本会查询example02存储桶中的cpu测量,指定字段名称为usage_user后,
        String flux2 = "from(bucket: \"example02\")\n" +
                "  |> range(start: -1h)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n" +
                "  |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n" +
                "  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
                "  |> yield(name: \"mean\")";

        // 4.使用query方法将会得到一个List<FluxTable>对象,其中每一个FluxTable都对应着一个序列
        List<FluxTable> query = queryApi.query(flux);

        // 5.下面这个for循环会把遍历每个序列,并将这个序列中对应的每一行数据打印出来。
        for (FluxTable fluxTable : query) {
            List<FluxRecord> records = fluxTable.getRecords();
            for (FluxRecord record : records) {
                Map<String, Object> one = record.getValues();
                System.out.println(one);
            }
        }

        // 6.下面的queryRaw方法将会得到一个字符串,字符串中是FLUX拓展的CSV格式的数据
        String data = queryApi.queryRaw(flux2);
        System.out.println(data);

    }
}


http://www.kler.cn/a/536337.html

相关文章:

  • Kubernetes是什么?为什么它是云原生的基石
  • leetcode_78子集
  • 在 MySQL 8 中配置主从同步(主从复制)是一个常见的需求,用于实现数据的冗余备份、读写分离等。
  • 02-合并两个有序数组
  • iOS 老项目适配 #Preview 预览功能
  • JavaScript前后端交互-AJAX/fetch
  • 英特尔至强服务器CPU销量创14年新低,AMD取得进展
  • Rust 的内存管理机制
  • 树莓集团深耕海南,为自贸港建设注入数字新活力
  • 智慧工厂可视化推动制造升级
  • shell脚本控制——处理信号
  • 【2024华为OD-E卷-100分-木板】(题目+思路+JavaC++Python解析)
  • fs 文件系统模块
  • 使用PyCharm创建项目以及如何注释代码
  • 【Elasticsearch】指标聚合概述
  • 每日Attention学习22——Inverted Residual RWKV
  • spring-ioc-基础知识
  • leetcode_双指针 557. 反转字符串中的单词 III
  • 上传文件报错:the request was rejected because no multipart boundary was found
  • Linux-查看开放端口
  • java---->策略模式
  • Intellij IDEA如何查看当前文件的类
  • HTML之form表单学习
  • go结构体详解
  • 03-移除元素
  • leetcode:1897. 重新分配字符使所有字符串都相等(python3解法)