springboot+iotdb的应用
简介:
IoTDB 是针对时间序列数据收集、存储与分析一体化的数据管理引擎。它具有体量轻、性能高、易使用的特点,完美对接 Hadoop 与 Spark 生态,适用于工业物联网应用中海量时间序列数据高速写入和复杂分析查询的需求。
文档地址:
https://iotdb.apache.org/zh/UserGuide/V0.13.x/
springboot+iotdb的应用:
我们学习新技术主要的目的是应用它,下面我有两种方式对springboot+iotdb进行应用。
第一种:
第一种方式是结合mybatis进行使用的,这个方式其实在官网上是不推荐使用的,但是如果使用mybatis习惯也是可以用的,这个方法是比较简单的。
直接上代码:
pom文件配置:
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>1.3.2</version>
</dependency>
yml文件配置
driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver
url: jdbc:iotdb://ip:6667/
username: root
password: root
实体类:
@Data
public class IotDbVO {
//这个必须要有
private Long Time;
private String name;
private String year;
private String month;
}
mapper类:
List<IotDbVO> getIot();
sql写法:
SELECT name as name, year as year, month as month from root.airspace.log
方法二:
方法二是原生的写法:
pom配置:
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>1.3.2</version>
</dependency>
配置类:
配置1:
@Configuration
@Slf4j
public class IoTDBSessionConfig {
@Value("${spring.iotdb.ip}")
private String ip;
@Value("${spring.iotdb.port}")
private int port;
@Value("${spring.iotdb.user}")
private String user;
@Value("${spring.iotdb.password}")
private String password;
@Value("${spring.iotdb.fetchSize}")
private int fetchSize;
private static Session session;
/**
* 初始化
* @return
* @throws IoTDBConnectionException
* @throws StatementExecutionException
*/
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder()
.host(ip)
.port(port)
.username(user)
.password(password)
.version(Version.V_1_0)
.build();
session.open(false);
session.setFetchSize(fetchSize);
// 设置时区
session.setTimeZone("+08:00");
}
log.info("连接iotdb成功.......");
return session;
}
}
配置2:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface IoTTableName {
/**
* 实体对应的表名
*/
String value() default "";
}
配置3:
@Data
public class IoTDBRecord {
/**
* 节点路径
*/
private String deviceId;
/**
* 时间戳
*/
private long time;
/**
* 属性
*/
private List<String> measurementsList;
/**
* 属性值
*/
private List<Object> valuesList;
/**
* 数据类型
*/
private List<String> typeList;
}
配置四:
public interface IoTDBRecordable {
Logger logger = LoggerFactory.getLogger(IoTDBRecordable.class);
/**
* 数据载入方法
* @return Record
*/
default IoTDBRecord toRecord() {
IoTDBRecord ioTDBRecord = new IoTDBRecord();
//当前时间
ioTDBRecord.setTime(System.currentTimeMillis());
Class aClass = this.getClass();
IoTTableName name = this.getClass().getAnnotation(IoTTableName.class);
ioTDBRecord.setDeviceId(name.value());
Field[] declaredFields = aClass.getDeclaredFields();
List<String> measurements = new ArrayList<>();
List<Object> records = new ArrayList<>();
List<String> types = new ArrayList<>();
try {
for (Field field : declaredFields) {
measurements.add(field.getName());
String methodNamePro = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);
Method methodName = this.getClass().getMethod("get" + methodNamePro);
records.add(methodName.invoke(this));
types.add(methodName.getReturnType().getName());
}
ioTDBRecord.setMeasurementsList(measurements);
ioTDBRecord.setValuesList(records);
ioTDBRecord.setTypeList(types);
} catch (Exception e) {
logger.error("IoTDB实体类转换异常:{}",e.getMessage());
}
return ioTDBRecord;
}
}
配置五:
@Slf4j
@Component
public class IotDBUtil {
@Autowired
private Session session;
/**
* 节点路径
* @param records
* @return
*/
private List<String> getDeviceIds(List<? extends IoTDBRecordable> records) {
List<String> deviceIds = new ArrayList<>();
for (IoTDBRecordable ioTDBRecordable : records) {
IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
String deviceId = ioTDBRecord.getDeviceId();
deviceIds.add(deviceId);
}
return deviceIds;
}
/**
* 时间戳
* @param records
* @return
*/
private List<Long> getTimes(List<? extends IoTDBRecordable> records) {
List<Long> times = new ArrayList<>();
for (IoTDBRecordable ioTDBRecordable : records) {
IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
times.add(ioTDBRecord.getTime());
}
return times;
}
/**
* 物理量 即:属性
* @param records
* @return
*/
private List<List<String>> getMeasurementsList(List<? extends IoTDBRecordable> records) {
List<List<String>> measurementsList = new ArrayList<>();
for (IoTDBRecordable ioTDBRecordable : records) {
IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
measurementsList.add(ioTDBRecord.getMeasurementsList());
}
return measurementsList;
}
/**
* 属性值 --- 属性必须与属性值一一对应
* @param records
* @return
*/
private List<List<Object>> getValuesList(List<? extends IoTDBRecordable> records) {
List<List<Object>> valuesList = new ArrayList<>();
for (IoTDBRecordable ioTDBRecordable : records) {
IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
valuesList.add(ioTDBRecord.getValuesList());
}
return valuesList;
}
/**
* 数据类型 BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5));
* @param records
* @return
*/
private List<List<TSDataType>> getTSDataType(List<? extends IoTDBRecordable> records) {
List<List<TSDataType>> valuesList = new ArrayList<>();
for (IoTDBRecordable ioTDBRecordable : records) {
IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
List<TSDataType> strList = new ArrayList<>();
for(String str : ioTDBRecord.getTypeList()){
strList.add(convertTypeByEntity(str));
}
valuesList.add(strList);
}
return valuesList;
}
/**
* 实体数据类型转换
* @param type 属性类型
* @return
*/
private TSDataType convertTypeByEntity(String type) {
switch (type) {
case "java.lang.Double":
return TSDataType.DOUBLE;
case "java.lang.Integer":
return TSDataType.INT32;
case "java.lang.Long":
return TSDataType.INT64;
case "java.lang.Boolean":
return TSDataType.BOOLEAN;
case "java.lang.Float":
return TSDataType.FLOAT;
default:
return TSDataType.TEXT;
}
}
/**
* 批量插入
* @param records 类集合
*/
public void insertRecords(List<? extends IoTDBRecordable> records) {
try {
session.insertRecords(getDeviceIds(records), getTimes(records), getMeasurementsList(records),getTSDataType(records),
getValuesList(records));
} catch (Exception e) {
log.error("IoTDB插入异常:{}",e.getMessage());
}
}
/**
* 单个插入实体
* @param recordEntity
*/
public void insertRecord(IoTDBRecordable recordEntity) {
try {
IoTDBRecord ioTDBRecord = recordEntity.toRecord();
List<TSDataType> strList = new ArrayList<>();
for(String str : ioTDBRecord.getTypeList()){
strList.add(convertTypeByEntity(str));
}
session.insertRecord(ioTDBRecord.getDeviceId(), ioTDBRecord.getTime(), ioTDBRecord.getMeasurementsList()
,strList, ioTDBRecord.getValuesList());
} catch (Exception e) {
log.error("IoTDB插入异常:{}",e.getMessage());
}
}
/**
* description: 根据SQL查询
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql,30000);
}
/**
* description: 删除分组 如 root.a1eaKSRpRty
* @param groupName:分组名称
* @return
*/
public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroup(groupName);
}
/**
* description: 根据Timeseries删除 如:root.a1eaKSRpRty.CA3013A303A25467.breath (个人理解:为具体的物理量)
*/
public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseries);
}
/**
* description: 根据Timeseries批量删除
*/
public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseriesList);
}
/**
* description: 根据分组批量删除
*/
public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroups(storageGroupList);
}
/**
* description: 根据路径和结束时间删除 结束时间之前的所有数据
*/
public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(path, endTime);
}
/**
* description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据
*/
public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, endTime);
}
/**
* description: 根据路径集合和时间段批量删除
*/
public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, startTime, endTime);
}
/**
* description: 查询数据
*/
public SessionDataSet executeLastDataQuery(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
return session.executeLastDataQuery(pathList, startTime, endTime);
}
}
yml配置:
iotdb:
ip: ip
port: 6667
user: root
password: root
fetchSize: 10000
maxActive: 10
使用如下:
在需要的类中引入
@Autowired
private IotDBUtil iotDBUtil;