当前位置: 首页 > article >正文

springboot+iotdb的应用

简介:
IoTDB 是针对时间序列数据收集、存储与分析一体化的数据管理引擎。它具有体量轻、性能高、易使用的特点,完美对接 Hadoop 与 Spark 生态,适用于工业物联网应用中海量时间序列数据高速写入和复杂分析查询的需求。
文档地址:
https://iotdb.apache.org/zh/UserGuide/V0.13.x/
springboot+iotdb的应用:
我们学习新技术主要的目的是应用它,下面我有两种方式对springboot+iotdb进行应用。
第一种:
第一种方式是结合mybatis进行使用的,这个方式其实在官网上是不推荐使用的,但是如果使用mybatis习惯也是可以用的,这个方法是比较简单的。
直接上代码:
pom文件配置:

<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>iotdb-jdbc</artifactId>
    <version>1.3.2</version>
</dependency>

yml文件配置

  driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver
  url: jdbc:iotdb://ip:6667/
  username: root
  password: root

实体类:

@Data
public class IotDbVO {
//这个必须要有
    private Long Time;

    private String name;

    private String year;

    private String month;
    }

mapper类:

List<IotDbVO> getIot();

sql写法:

SELECT name as name, year as year, month as month from  root.airspace.log

方法二:
方法二是原生的写法:
pom配置:

<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>iotdb-session</artifactId>
    <version>1.3.2</version>
</dependency>

配置类:

配置1:

@Configuration
@Slf4j
public class IoTDBSessionConfig {

    @Value("${spring.iotdb.ip}")
    private String ip;

    @Value("${spring.iotdb.port}")
    private int port;

    @Value("${spring.iotdb.user}")
    private String user;

    @Value("${spring.iotdb.password}")
    private String password;

    @Value("${spring.iotdb.fetchSize}")
    private int fetchSize;

    private static Session session;

    /**
     * 初始化
     * @return
     * @throws IoTDBConnectionException
     * @throws StatementExecutionException
     */
    @Bean
    public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
        if (session == null) {
            log.info("正在连接iotdb.......");
            session = new Session.Builder()
                    .host(ip)
                    .port(port)
                    .username(user)
                    .password(password)
                    .version(Version.V_1_0)
                    .build();
            session.open(false);
            session.setFetchSize(fetchSize);
            // 设置时区
            session.setTimeZone("+08:00");
        }
        log.info("连接iotdb成功.......");
        return session;
    }


}

配置2:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface IoTTableName {
    /**
     * 实体对应的表名
     */
    String value() default "";
}

配置3:

@Data
public class IoTDBRecord {
    /**
     * 节点路径
     */
    private String deviceId;
    /**
     * 时间戳
     */
    private long time;
    /**
     * 属性
     */
    private List<String> measurementsList;
    /**
     * 属性值
     */
    private List<Object> valuesList;
    /**
     * 数据类型
     */
    private List<String> typeList;

}

配置四:

public interface IoTDBRecordable {
    Logger logger = LoggerFactory.getLogger(IoTDBRecordable.class);
    /**
     * 数据载入方法
     * @return Record
     */
    default IoTDBRecord toRecord() {
        IoTDBRecord ioTDBRecord = new IoTDBRecord();
        //当前时间
        ioTDBRecord.setTime(System.currentTimeMillis());
        Class aClass = this.getClass();
        IoTTableName name = this.getClass().getAnnotation(IoTTableName.class);
        ioTDBRecord.setDeviceId(name.value());
        Field[] declaredFields = aClass.getDeclaredFields();
        List<String> measurements = new ArrayList<>();
        List<Object> records = new ArrayList<>();
        List<String> types = new ArrayList<>();
        try {
            for (Field field : declaredFields) {
                measurements.add(field.getName());
                String methodNamePro = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);
                Method methodName = this.getClass().getMethod("get" + methodNamePro);
                records.add(methodName.invoke(this));
                types.add(methodName.getReturnType().getName());
            }
            ioTDBRecord.setMeasurementsList(measurements);
            ioTDBRecord.setValuesList(records);
            ioTDBRecord.setTypeList(types);
        } catch (Exception e) {
            logger.error("IoTDB实体类转换异常:{}",e.getMessage());
        }
        return ioTDBRecord;
    }
}

配置五:

@Slf4j
@Component
public class IotDBUtil {
    @Autowired
    private  Session session;
    /**
     * 节点路径
     * @param records
     * @return
     */
    private List<String> getDeviceIds(List<? extends IoTDBRecordable> records) {
        List<String> deviceIds = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            String deviceId = ioTDBRecord.getDeviceId();
            deviceIds.add(deviceId);
        }
        return deviceIds;
    }

    /**
     * 时间戳
     * @param records
     * @return
     */
    private List<Long> getTimes(List<? extends IoTDBRecordable> records) {
        List<Long> times = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            times.add(ioTDBRecord.getTime());
        }
        return times;
    }

    /**
     * 物理量 即:属性
     * @param records
     * @return
     */
    private List<List<String>> getMeasurementsList(List<? extends IoTDBRecordable> records) {
        List<List<String>> measurementsList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            measurementsList.add(ioTDBRecord.getMeasurementsList());
        }
        return measurementsList;
    }

    /**
     * 属性值 --- 属性必须与属性值一一对应
     * @param records
     * @return
     */
    private List<List<Object>> getValuesList(List<? extends IoTDBRecordable> records) {
        List<List<Object>> valuesList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            valuesList.add(ioTDBRecord.getValuesList());
        }

        return valuesList;
    }

    /**
     * 数据类型 BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5));
     * @param records
     * @return
     */
    private List<List<TSDataType>> getTSDataType(List<? extends IoTDBRecordable> records) {
        List<List<TSDataType>> valuesList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            List<TSDataType> strList = new ArrayList<>();
            for(String str : ioTDBRecord.getTypeList()){
                strList.add(convertTypeByEntity(str));
            }
            valuesList.add(strList);
        }

        return valuesList;
    }

    /**
     * 实体数据类型转换
     * @param type 属性类型
     * @return
     */
    private TSDataType convertTypeByEntity(String type) {
        switch (type) {
            case "java.lang.Double":
                return TSDataType.DOUBLE;
            case "java.lang.Integer":
                return TSDataType.INT32;
            case "java.lang.Long":
                return TSDataType.INT64;
            case "java.lang.Boolean":
                return TSDataType.BOOLEAN;
            case "java.lang.Float":
                return TSDataType.FLOAT;
            default:
                return TSDataType.TEXT;
        }
    }

    /**
     * 批量插入
     * @param records 类集合
     */
    public void insertRecords(List<? extends IoTDBRecordable> records) {
        try {
            session.insertRecords(getDeviceIds(records), getTimes(records), getMeasurementsList(records),getTSDataType(records),
                    getValuesList(records));
        } catch (Exception e) {
            log.error("IoTDB插入异常:{}",e.getMessage());
        }
    }

    /**
     * 单个插入实体
     * @param recordEntity
     */
    public void insertRecord(IoTDBRecordable recordEntity) {
        try {
            IoTDBRecord ioTDBRecord = recordEntity.toRecord();
            List<TSDataType> strList = new ArrayList<>();
            for(String str : ioTDBRecord.getTypeList()){
                strList.add(convertTypeByEntity(str));
            }
            session.insertRecord(ioTDBRecord.getDeviceId(), ioTDBRecord.getTime(), ioTDBRecord.getMeasurementsList()
                    ,strList, ioTDBRecord.getValuesList());
        } catch (Exception e) {
            log.error("IoTDB插入异常:{}",e.getMessage());
        }
    }

    /**
     * description: 根据SQL查询
     */
    public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
        return session.executeQueryStatement(sql,30000);
    }

    /**
     * description: 删除分组 如 root.a1eaKSRpRty
     * @param  groupName:分组名称
     * @return
     */
    public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteStorageGroup(groupName);
    }

    /**
     * description: 根据Timeseries删除  如:root.a1eaKSRpRty.CA3013A303A25467.breath  (个人理解:为具体的物理量)
     */
    public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteTimeseries(timeseries);
    }
    /**
     * description: 根据Timeseries批量删除
     */
    public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteTimeseries(timeseriesList);
    }

    /**
     * description: 根据分组批量删除
     */
    public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteStorageGroups(storageGroupList);
    }

    /**
     * description: 根据路径和结束时间删除 结束时间之前的所有数据
     */
    public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(path, endTime);
    }
    /**
     * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据
     */
    public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(pathList, endTime);
    }
    /**
     * description: 根据路径集合和时间段批量删除
     */
    public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(pathList, startTime, endTime);
    }
    /**
     * description: 查询数据
     */
    public SessionDataSet executeLastDataQuery(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        return session.executeLastDataQuery(pathList, startTime, endTime);
    }
}

yml配置:

iotdb:
    ip: ip
    port: 6667
    user: root
    password: root
    fetchSize: 10000
    maxActive: 10

使用如下:
在需要的类中引入

@Autowired
private IotDBUtil iotDBUtil;

http://www.kler.cn/a/371398.html

相关文章:

  • 【redis】ubuntu18安装redis7
  • 二进制编码 和 Base64编码
  • Redis设计与实现 学习笔记 第十二章 事件
  • MySQL安装配置教程
  • 【数据仓库】
  • uniapp圆形波浪进度效果
  • 2024护理类科技核心期刊汇总(最新版)
  • 基于uniapp微信小程序的宠物救助宠物领养系统
  • 【动植物毒性数据集】毒蛇识别 蘑菇毒性分类 人工智能 深度学习 目标检测 Python(含数据集)
  • 【算法篇】图论类(1)(笔记)
  • 【C#】编写计算机选课程序
  • 跨越地域限制:在线原型设计软件的自由与便捷
  • 标准正态分布的数据 tensorflow 实现正态分布图,python 编程,数据分析和人工智能...
  • 华为手机卸载系统应用的方法
  • Mac下载 安装MIMIC-IV 3.0数据集
  • 10个你应该立即卸载的VS Code扩展
  • 《机器学习by周志华》学习笔记-神经网络-03多层网络学习算法之误差逆传播算法
  • 排序算法中——冒泡排序和快速排序
  • 【vue】12.全局组件与局部组件的深入解析
  • 5G网络中RLC层及其切割简介
  • 【Redis】常见基本全局命令
  • GIT分布式版本控制系统基础操作