Spring Boot + InfluxDB 实现高效数据存储与查询
1️⃣ 项目介绍
本项目使用 Spring Boot + InfluxDB 2.x 来存储和查询时间序列数据,适用于 物联网(IoT)、实时监控、日志分析 等场景。
2️⃣ InfluxDB 介绍
InfluxDB 是一个高性能的时间序列数据库(TSDB),适用于存储温度、传感器数据、日志、监控指标等。
🔹 特点:
- 采用 Flux 查询语言
- 高吞吐量,支持 批量写入
- Tag(索引)+ Field(数据) 结构,提高查询效率
- 精确时间戳(支持纳秒级)
3️⃣ Spring Boot 配置 InfluxDB
在 application.yml
中配置 InfluxDB 连接:
# InfluxDB 独立配置
influxdb:
url: http://192.168.1.1xx:28086/ # InfluxDB 服务器地址
token: _7FZlXGJJcd8Ayox-F-hVBDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2JsR-Xf0qTtAxg==
org: xxx # 组织名称
bucket: xxx # 存储桶名称
# InfluxDB 客户端日志级别
# ERROR: 仅记录错误日志
# WARN: 记录警告和错误日志
# INFO: 记录普通信息、警告和错误日志
# DEBUG: 记录调试级别的详细日志
# BODY: 记录完整的 HTTP 请求和响应主体
# TRACE: 记录极其详细的跟踪日志
# ALL: 记录所有日志级别(视客户端而定)
logLevel: BODY
4️⃣ InfluxDB 连接配置
在 InfluxDBConfig.java
中配置 InfluxDB 客户端:
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDBConfig {
@Value("${influxdb.url}")
private String url;
@Value("${influxdb.token}")
private String token;
@Value("${influxdb.org}")
private String org;
@Value("${influxdb.bucket}")
private String bucket;
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
}
}
🔹 说明
InfluxDBClientFactory.create(url, token, org, bucket)
创建 InfluxDB 客户端@Value
读取application.yml
配置
5️⃣ Service 层:数据写入 & 查询
✅ 5.1 单条数据写入
public void writeSingleData(TemperatureDTO temperatureDTO) {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
Point point = Point.measurement("temperature")
.addTag("location", temperatureDTO.getLocation()) // 添加标签(索引)
.addField("value", temperatureDTO.getValue()) // 添加字段(数据)
.time(Instant.now(), WritePrecision.NS); // 记录当前时间戳
writeApi.writePoint(point);
}
✅ 5.2 批量写入(异步)
public void writeBatchData(List<TemperatureDTO> temperatureDTOs) {
WriteApi writeApi = influxDBClient.makeWriteApi(); // 获取异步 API
List<Point> points = temperatureDTOs.stream()
.map(dto -> Point.measurement("temperature")
.addTag("location", dto.getLocation())
.addField("value", dto.getValue())
.time(Instant.now(), WritePrecision.NS))
.collect(Collectors.toList());
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));
future.whenComplete((result, error) -> {
if (error != null) {
System.err.println("🔥 写入失败:" + error.getMessage());
} else {
writeApi.close(); // 关闭 API 避免资源泄露
log.info("✅ 批量数据写入成功(异步)");
}
});
}
🔹 说明
- 异步写入 不会阻塞主线程,提高吞吐量
- 异常回调 捕获写入失败的信息
- 使用
writeApi.close()
避免资源泄露
✅ 5.3 查询数据
public List<TemperatureVO> queryTemperatureData() {
String query = "from(bucket: \"test\") |> range(start: -1h)";
QueryApi queryApi = influxDBClient.getQueryApi();
return queryApi.query(query)
.stream()
.flatMap(fluxTable -> fluxTable.getRecords().stream()) // 遍历 FluxTable
.map(record -> {
TemperatureVO vo = new TemperatureVO();
vo.setLocation((String) record.getValueByKey("location")); // 获取标签信息
Object valueObj = record.getValueByKey("_value");
vo.setValue(valueObj != null ? ((Number) valueObj).doubleValue() : 0.0);
vo.setTimestamp(record.getTime().toString());
return vo;
})
.collect(Collectors.toList());
}
🔹 说明
- Flux 查询 过去
1h
内的数据 - 遍历
FluxTable
提取 标签 + 字段 数据
6️⃣ Controller 层:API 设计
@RestController
@RequestMapping("/api/influxdb")
public class InfluxDBController {
@Autowired
private TestService influxDBService;
@PostMapping("/write")
public String writeData(@RequestBody TemperatureDTO temperatureDTO) {
influxDBService.writeSingleData(temperatureDTO);
return "✅ 单条数据写入成功!";
}
@PostMapping("/write-batch")
public String writeBatchData() {
List<TemperatureDTO> data = generateTestData(10000);
influxDBService.writeBatchData(data);
return "✅ 10,000 条数据成功写入!";
}
@GetMapping("/query")
public List<TemperatureVO> queryTemperatureData() {
return influxDBService.queryTemperatureData();
}
private List<TemperatureDTO> generateTestData(int count) {
List<TemperatureDTO> dataList = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < count; i++) {
TemperatureDTO dto = new TemperatureDTO();
dto.setLocation("office-" + (random.nextInt(1000) + 1));
dto.setValue(15 + (random.nextDouble() * 10));
dataList.add(dto);
}
return dataList;
}
}
🔹 说明
/write
➝ 单条写入/write-batch
➝ 生成 10,000 条数据并写入/query
➝ 查询过去 1 小时数据
7️⃣ 运行 & 测试
✅ 7.1 启动项目
mvn spring-boot:run
✅ 7.2 使用 Postman 进行测试
1️⃣ 写入单条数据
POST http://localhost:8080/api/influxdb/write
{
"location": "office-1",
"value": 22.5
}
2️⃣ 批量写入
POST http://localhost:8080/api/influxdb/write-batch
3️⃣ 查询数据
GET http://localhost:8080/api/influxdb/query