Spring Boot + InfluxDB 批量写入(同步、异步、重试机制)
📌 1. 项目介绍
本项目使用 Spring Boot + InfluxDB 2.x,主要介绍 批量写入数据 的三种方式:
- 同步写入(Blocking Write)
- 异步写入(Non-blocking Write)
- 带重试机制的写入(Handling Errors with Retry)
适用于 高并发数据写入、物联网(IoT)、实时监控 等场景。
📌 2. InfluxDB 连接配置
✅ application.yml
# InfluxDB 独立配置
influxdb:
url: http://192.168.1.xxx:28086/ # InfluxDB 服务器地址
token: _7FZlXGJJcd8Ayox-F-hVBDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2JsR-Xf0qTtAxg==
org: xxx # 组织名称
bucket: xxx # 存储桶名称
logLevel: BODY # 记录完整的 HTTP 请求和响应日志
✅ InfluxDBConfig.java
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDBConfig {
@Value("${influxdb.url}")
private String url;
@Value("${influxdb.token}")
private String token;
@Value("${influxdb.org}")
private String org;
@Value("${influxdb.bucket}")
private String bucket;
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
}
}
🔹 说明
- 通过
@Value("${influxdb.xxx}")
读取application.yml
的配置。 InfluxDBClientFactory.create(url, token, org, bucket)
创建 InfluxDB 客户端。
📌 3. Service 层:批量写入数据
在 TestServiceImpl.java
中,我们提供 三种批量写入方法。
✅ 3.1 同步批量写入
@Override
public void writeBatchDataSync(List<TemperatureDTO> temperatureDTOs) {
// 获取 InfluxDB 同步写入 API
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
// 构建批量数据点
List<Point> points = temperatureDTOs.stream()
.map(dto -> Point.measurement("temperature")
.addTag("location", dto.getLocation()) // 标签(索引)
.addField("value", dto.getValue()) // 字段(数据)
.time(Instant.now(), WritePrecision.NS))
.collect(Collectors.toList());
// **同步写入数据**
writeApi.writePoints(points);
System.out.println("✅ 批量数据写入成功(同步)");
}
🔹 说明
getWriteApiBlocking()
同步写入 API,主线程会阻塞,直到数据写入完成。writePoints(points)
直接写入所有数据点。
🔹 适用场景
- 小规模数据写入
- 确保数据立即存入数据库
✅ 3.2 异步批量写入
@Override
public void writeBatchDataAsync(List<TemperatureDTO> temperatureDTOs) {
// 获取 InfluxDB **异步写入 API**
WriteApi writeApi = influxDBClient.makeWriteApi();
// 构建批量数据点
List<Point> points = temperatureDTOs.stream()
.map(dto -> Point.measurement("temperature")
.addTag("location", dto.getLocation())
.addField("value", dto.getValue())
.time(Instant.now(), WritePrecision.NS))
.collect(Collectors.toList());
// **异步写入数据**
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));
// **写入完成后的回调**
future.whenComplete((result, error) -> {
if (error != null) {
System.err.println("🔥 异步写入失败:" + error.getMessage());
} else {
writeApi.close(); // **写入完成后关闭 API,避免资源泄漏**
System.out.println("✅ 批量数据写入成功(异步)");
}
});
}
🔹 说明
- 异步写入 API (
makeWriteApi()
),不会阻塞主线程,数据写入后台执行。 CompletableFuture
处理回调,写入失败会打印日志。- 写入完成后
writeApi.close()
,避免资源泄漏。
🔹 适用场景
- 高吞吐量、大数据量写入
- 不需要立即确认写入结果
✅ 3.3 带重试机制的批量写入
@Override
public void writeBatchDataWithRetry(List<TemperatureDTO> temperatureDTOs) {
// 获取 InfluxDB **异步写入 API**
WriteApi writeApi = influxDBClient.makeWriteApi();
// 构建批量数据点
List<Point> points = temperatureDTOs.stream()
.map(dto -> Point.measurement("temperature")
.addTag("location", dto.getLocation())
.addField("value", dto.getValue())
.time(Instant.now(), WritePrecision.NS))
.collect(Collectors.toList());
// **设置最大重试次数**
int maxRetries = 3;
int attempt = 0;
boolean success = false;
while (attempt < maxRetries && !success) {
try {
writeApi.writePoints(points); // **尝试写入数据**
success = true; // **写入成功,退出循环**
System.out.println("✅ 批量数据写入成功(重试机制)");
} catch (Exception e) {
attempt++;
System.err.println("⚠️ 第 " + attempt + " 次写入失败:" + e.getMessage());
if (attempt == maxRetries) {
System.err.println("❌ 达到最大重试次数,写入失败");
}
try {
Thread.sleep(1000); // **延迟 1 秒后重试**
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
writeApi.close(); // **确保最终关闭 API**
}
🔹 说明
- 最大重试次数
maxRetries = 3
,写入失败会自动重试。 Thread.sleep(1000)
延迟 1 秒,防止频繁重试导致服务器压力过大。- 重试成功后
success = true
退出循环。
🔹 适用场景
- 网络不稳定(断网、超时)
- 避免短暂的数据库异常导致数据丢失
📌 4. 测试接口(Controller 层)
java
复制编辑
@RestController
@RequestMapping("/api/influxdb")
public class InfluxDBController {
@Autowired
private TestService influxDBService;
@PostMapping("/write-batch-sync")
public String writeBatchSync(@RequestBody List<TemperatureDTO> temperatureDTOs) {
influxDBService.writeBatchDataSync(temperatureDTOs);
return "✅ 批量数据写入成功(同步)";
}
@PostMapping("/write-batch-async")
public String writeBatchAsync(@RequestBody List<TemperatureDTO> temperatureDTOs) {
influxDBService.writeBatchDataAsync(temperatureDTOs);
return "✅ 批量数据写入成功(异步)";
}
@PostMapping("/write-batch-retry")
public String writeBatchWithRetry(@RequestBody List<TemperatureDTO> temperatureDTOs) {
influxDBService.writeBatchDataWithRetry(temperatureDTOs);
return "✅ 批量数据写入成功(重试机制)";
}
}
📌 5. 总结
✅ 同步写入:适合小规模数据
✅ 异步写入:适合高吞吐量、大规模数据
✅ 重试机制:适合网络不稳定场景
🚀 这样,你的批量写入功能更加健壮了! 🎯