当前位置: 首页 > article >正文

Spring Boot + InfluxDB 批量写入(同步、异步、重试机制)

📌 1. 项目介绍

本项目使用 Spring Boot + InfluxDB 2.x,主要介绍 批量写入数据 的三种方式:

  1. 同步写入(Blocking Write)
  2. 异步写入(Non-blocking Write)
  3. 带重试机制的写入(Handling Errors with Retry)

适用于 高并发数据写入、物联网(IoT)、实时监控 等场景。

📌 2. InfluxDB 连接配置

✅ application.yml

# InfluxDB 独立配置
influxdb:
  url: http://192.168.1.xxx:28086/  # InfluxDB 服务器地址
  token: _7FZlXGJJcd8Ayox-F-hVBDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2JsR-Xf0qTtAxg==
  org: xxx     # 组织名称
  bucket: xxx  # 存储桶名称
  logLevel: BODY  # 记录完整的 HTTP 请求和响应日志

✅ InfluxDBConfig.java

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InfluxDBConfig {

    @Value("${influxdb.url}")
    private String url;

    @Value("${influxdb.token}")
    private String token;

    @Value("${influxdb.org}")
    private String org;

    @Value("${influxdb.bucket}")
    private String bucket;

    @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
    }
}

🔹 说明

  • 通过 @Value("${influxdb.xxx}") 读取 application.yml 的配置。
  • InfluxDBClientFactory.create(url, token, org, bucket) 创建 InfluxDB 客户端

📌 3. Service 层:批量写入数据

TestServiceImpl.java 中,我们提供 三种批量写入方法


✅ 3.1 同步批量写入

@Override
public void writeBatchDataSync(List<TemperatureDTO> temperatureDTOs) {
    // 获取 InfluxDB 同步写入 API
    WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();

    // 构建批量数据点
    List<Point> points = temperatureDTOs.stream()
            .map(dto -> Point.measurement("temperature")
                    .addTag("location", dto.getLocation()) // 标签(索引)
                    .addField("value", dto.getValue()) // 字段(数据)
                    .time(Instant.now(), WritePrecision.NS))
            .collect(Collectors.toList());

    // **同步写入数据**
    writeApi.writePoints(points);
    System.out.println("✅ 批量数据写入成功(同步)");
}

🔹 说明

  • getWriteApiBlocking() 同步写入 API主线程会阻塞,直到数据写入完成
  • writePoints(points) 直接写入所有数据点。

🔹 适用场景

  • 小规模数据写入
  • 确保数据立即存入数据库

✅ 3.2 异步批量写入

@Override
public void writeBatchDataAsync(List<TemperatureDTO> temperatureDTOs) {
    // 获取 InfluxDB **异步写入 API**
    WriteApi writeApi = influxDBClient.makeWriteApi();

    // 构建批量数据点
    List<Point> points = temperatureDTOs.stream()
            .map(dto -> Point.measurement("temperature")
                    .addTag("location", dto.getLocation())
                    .addField("value", dto.getValue())
                    .time(Instant.now(), WritePrecision.NS))
            .collect(Collectors.toList());

    // **异步写入数据**
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));

    // **写入完成后的回调**
    future.whenComplete((result, error) -> {
        if (error != null) {
            System.err.println("🔥 异步写入失败:" + error.getMessage());
        } else {
            writeApi.close();  // **写入完成后关闭 API,避免资源泄漏**
            System.out.println("✅ 批量数据写入成功(异步)");
        }
    });
}

🔹 说明

  • 异步写入 API (makeWriteApi()),不会阻塞主线程,数据写入后台执行。
  • CompletableFuture 处理回调,写入失败会打印日志。
  • 写入完成后 writeApi.close(),避免资源泄漏。

🔹 适用场景

  • 高吞吐量、大数据量写入
  • 不需要立即确认写入结果

✅ 3.3 带重试机制的批量写入

@Override
public void writeBatchDataWithRetry(List<TemperatureDTO> temperatureDTOs) {
    // 获取 InfluxDB **异步写入 API**
    WriteApi writeApi = influxDBClient.makeWriteApi();

    // 构建批量数据点
    List<Point> points = temperatureDTOs.stream()
            .map(dto -> Point.measurement("temperature")
                    .addTag("location", dto.getLocation())
                    .addField("value", dto.getValue())
                    .time(Instant.now(), WritePrecision.NS))
            .collect(Collectors.toList());

    // **设置最大重试次数**
    int maxRetries = 3;
    int attempt = 0;
    boolean success = false;

    while (attempt < maxRetries && !success) {
        try {
            writeApi.writePoints(points); // **尝试写入数据**
            success = true; // **写入成功,退出循环**
            System.out.println("✅ 批量数据写入成功(重试机制)");
        } catch (Exception e) {
            attempt++;
            System.err.println("⚠️ 第 " + attempt + " 次写入失败:" + e.getMessage());

            if (attempt == maxRetries) {
                System.err.println("❌ 达到最大重试次数,写入失败");
            }

            try {
                Thread.sleep(1000); // **延迟 1 秒后重试**
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    writeApi.close(); // **确保最终关闭 API**
}

🔹 说明

  • 最大重试次数 maxRetries = 3,写入失败会自动重试。
  • Thread.sleep(1000) 延迟 1 秒,防止频繁重试导致服务器压力过大。
  • 重试成功后 success = true 退出循环

🔹 适用场景

  • 网络不稳定(断网、超时)
  • 避免短暂的数据库异常导致数据丢失

📌 4. 测试接口(Controller 层)

 

java

复制编辑

@RestController
@RequestMapping("/api/influxdb")
public class InfluxDBController {

    @Autowired
    private TestService influxDBService;

    @PostMapping("/write-batch-sync")
    public String writeBatchSync(@RequestBody List<TemperatureDTO> temperatureDTOs) {
        influxDBService.writeBatchDataSync(temperatureDTOs);
        return "✅ 批量数据写入成功(同步)";
    }

    @PostMapping("/write-batch-async")
    public String writeBatchAsync(@RequestBody List<TemperatureDTO> temperatureDTOs) {
        influxDBService.writeBatchDataAsync(temperatureDTOs);
        return "✅ 批量数据写入成功(异步)";
    }

    @PostMapping("/write-batch-retry")
    public String writeBatchWithRetry(@RequestBody List<TemperatureDTO> temperatureDTOs) {
        influxDBService.writeBatchDataWithRetry(temperatureDTOs);
        return "✅ 批量数据写入成功(重试机制)";
    }
}

📌 5. 总结

同步写入:适合小规模数据
异步写入:适合高吞吐量、大规模数据
重试机制:适合网络不稳定场景

🚀 这样,你的批量写入功能更加健壮了! 🎯


http://www.kler.cn/a/587726.html

相关文章:

  • AI自动文献综述——python先把知网的文献转excel
  • EB-Cable许可管理中的数据安全与隐私保护
  • UE材质RadialGradientExponential
  • WebSocket 使用教程
  • 前端无限滚动内容自动回收技术详解:原理、实现与优化
  • 在 VMware 中安装 Ubuntu 的超详细实战分享
  • Postman用JSON格式数据发送POST请求及注意事项
  • LeetCode 2226. Maximum Candies Allocated to K Children(2025/3/14 每日一题)
  • 【MySQL】数据库简要介绍和简单应用
  • 分享一个sql统计的客户需求
  • Vue2+Vant2 项目初学
  • 故障诊断——neo4j入门
  • centos7通过yum安装redis
  • golang算法回溯
  • spring boot 发送邮件验证码
  • 解锁健康密码:拥抱养生,重塑生活
  • python笔记2
  • Ubuntu 18,04 LTS 通过APT安装mips64el的交叉编译器。
  • TCP/IP四层网络模型
  • 玩转github