当前位置: 首页 > article >正文

SpringBoot使用Kafka如何保证消息不丢失

概述

在 Spring Boot 中使用 Kafka 时,要确保消息不丢失,主要涉及到生产者(Producer)、消费者(Consumer)以及 Kafka Broker 的配置和设计。

1. Spring Boot 与 Kafka 配置

Spring Boot 中使用 Kafka 时,可以通过 spring-kafka 来简化配置和操作。以下是如何保证消息不丢

1.1 Producer 配置

Kafka 生产者是消息的发送方,确保消息的可靠性和不丢失需要配置

配置 Kafka 生产

application.ymlapplication.properties 文件中

spring:
  kafka:
    producer:
      acks: all                  # 消息确认策略:all表示等待所有副本确认
      retries: 3                  # 发送失败时的重试次数
      batch-size: 16384           # 每批发送的消息大小
      linger-ms: 1                # 消息发送的延迟时间(单位:毫秒)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # Key序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  # Value序列化器
重要配置
  1. acks=all:生产leader 副本宕机或数据丢失的
  2. retries=3
  3. linger-ms=1:生产者在发送消息时会有
  4. batch-size=16384:设定
1.2 Kafka Producer 配置实例

通过 KafkaTemplate 发送消息时,可以通过 Java 配置来确保消息的可靠性。以下是如何

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "test_topic";

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);  // 发送消息
    }
}
  • KafkaTemplate.send() 会使用 `acks=acks=all 确保消息写入 Kafka 集群时的可靠性

2. 消费者配置

消费者是 Kafka 的消息接收方。在消费过程中,确保消息的可靠性和不丢失需要使用合适的 acknowledgment(确认机制)来保证消息的消费状态。

2.1 Consumer 配置

Kafka 消费者的配置在 Spring Boot 中也可以通过 application.ymlapplication.properties 来进行配置,确保消费者能够在接收到消息后正确ack)消息。

spring:
  kafka:
    consumer:
      group-id: test-group        # 消费者所在的消费者组
      enable-auto-commit: false   # 自动提交消费位移,设置为false使用手动提交
      auto-offset-reset: earliest # 如果没有偏移量(offset),从最早的位置开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Key反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Value反序列化器
重要配置项解释:
  • enable-auto-commit=false:禁用自动提交偏移量,手动提交偏移量可以防止消息丢失。自动提交可能会导
  • auto-offset-reset=earliest:如果消费者没有消费过的偏earliest(最早的消息)开始消费,
  • group-id=test-group:消费者的组 ID,每个组
2.2 Consumer 手动提交偏移量

为了避免消息消费丢失,建议手动提交消息的偏移量。在 Spring Kafka 中,使用 Acknowledgment 来手动确

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaConsumerService {

    @KafkaListener(topics = "test_topic", groupId = "test-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        // 消费消息后手动提交偏移量
        System.out.println("Received message: " + record.value());
        
        // 手动提交偏移量
        acknowledgment.acknowledge();
    }
}
  • acknowledgment.acknowledge():手动确认

3. Kafka Broker 配置

Kafka Broker 配置是确保消息不丢失的关键部分。Kafka Broker 是 Kafka 集群的核心,它负责接收、存储、处理和分发消息。以下是关于 Kafka Broker 配置的几个重要方面:


3.1 副本数(Replication)与分区数(Partition)

Kafka 使用 副本(Replication) 来保证数据的高可用性和容错性。每个主题(Topic)通常有多个 分区(Partition),每个分区都有一个 领导副本(Leader) 和多个 跟随副本(Follower)。如果某个 Broker 或分区失败,副本机制确保数据不会丢失,系统仍能正常运行。

  • 分区数:每个主题通常会有多个分区,分区数的设置直接影响消息的吞吐量和并行度。更多的分区可以提高消息的处理速度,但也会增加系统的管理开销。

  • 副本数:每个分区应有多个副本,确保数据的冗余存储。副本数设置得越高,数据丢失的可能性越小,但会占用更多的存储资源。

关键配置:
# 每个主题的分区数(默认3分区)
num.partitions=3

# 默认副本因子
default.replication.factor=3

# 最小同步副本数
min.insync.replicas=2
  • num.partitions:设置默认的分区数(可以在创建主题时指定)。
  • default.replication.factor:设置每个主题的默认副本数,建议至少设置为 3。
  • min.insync.replicas:设置最小同步副本数,确保消息只有在至少有该数量的副本成功写入后才算成功。通常设置为 2 或更多,以保证消息的可靠性。

注意min.insync.replicas 是为了避免某些副本没有同步就确认消息,这可能导致数据丢失。


3.2 消息持久化与日志配置

Kafka 的持久化策略和日志配置确保消息在磁盘上长期存储。Kafka 将每个分区的消息存储在磁盘上的日志文件中。为了避免消息丢失和提高读写性能,需要合理配置日志存储相关参数。

关键配置:
# 消息日志的清理策略:可以选择基于时间清理或基于大小清理
log.retention.hours=168      # 消息保留 7 天
log.retention.bytes=10737418240  # 设置日志最大存储大小为 10 GB

# 消息写入磁盘的刷新频率
log.flush.interval.messages=10000   # 每 10000 条消息后刷新一次磁盘
log.flush.interval.ms=1000           # 每秒刷新一次磁盘

# 启用压缩
log.cleanup.policy=compact   # 启用日志压缩,保留最新版本的数据

# 消息写入到磁盘的时延
log.segment.bytes=1073741824  # 每个日志段的大小为 1 GB
  • log.retention.hours:设置消息保留的时间(单位:小时),默认 168 小时(即 7 天)。过期的消息会被删除。
  • log.retention.bytes:设置日志保留的最大字节数,超出大小的日志会被清理。
  • log.flush.interval.messageslog.flush.interval.ms:这些配置控制 Kafka 将消息刷新到磁盘的频率,可以根据实际需要优化,减少 I/O 操作。
  • log.cleanup.policy:设置日志清理策略。可以选择 delete(基于时间或大小清理)或 compact(日志压缩,仅保留最新版本的消息)。
  • log.segment.bytes:设置每个日志段的大小,当日志达到该大小时会创建新的日志段。

3.3 分区副本同步与数据一致性

Kafka 的分区副本同步策略决定了数据的写入和同步方式。为了确保消息的可靠性,需要配置副本同步机制,使数据被可靠地写入到多个副本。

  • acks=all:确保生产者消息写入所有副本后才确认消息写入成功。可以配置在生产者端。
  • min.insync.replicas:设置最小同步副本数,保证数据在多个副本同步后再提交。
关键配置:
# 设置同步副本数,确保写入成功后至少需要此数目的副本同步
acks=all   # 等待所有副本确认写入

# 设置最小同步副本数
min.insync.replicas=2  # 确保至少有 2 个副本同步
  • acks=all:确保消息在所有副本成功写入后才返回确认。即使部分副本未同步,也不会确认写入。
  • min.insync.replicas:确保消息写入时,至少有 min.insync.replicas 个副本同步,以避免因为单个副本失效导致的数据丢失。

3.4 Kafka Broker 高可用配置

在 Kafka 集群中,为了防止单点故障,Kafka 提供了高可用性配置。配置多个 Kafka Broker 和分布式部署是保证 Kafka 高可用性的基础。

关键配置:
# Kafka Broker 启动时的监听地址和端口
listeners=PLAINTEXT://localhost:9092

# 监听的内网地址,通常与外网地址分开配置
advertised.listeners=PLAINTEXT://broker1.example.com:9092

# Zookeeper 配置
zookeeper.connect=localhost:2181  # 设置 Zookeeper 地址,确保 Kafka 集群管理的协调和一致性
  • listeners:配置 Kafka Broker 启动时监听的地址和端口。
  • advertised.listeners:配置 Kafka Broker 向外暴露的地址和端口,消费者和生产者会连接到这个地址。
  • zookeeper.connect:设置 Zookeeper 地址,Kafka 依赖 Zookeeper 来管理集群的元数据和协调。
3.5 日志段与磁盘空间管理

Kafka 使用日志段(Log Segment)来管理消息存储,每个分区的消息都会分布在多个日志段中,磁盘空间需要合理管理以避免磁盘溢出。

关键配置:
# 设置日志段的大小(单位:字节)
log.segment.bytes=1073741824  # 每个日志段的大小为 1 GB

# 每个日志段的最大时间
log.roll.ms=604800000  # 设置为 7 天
  • log.segment.bytes:每个日志段的大小,达到该大小时会创建新的日志段。设置过大可能影响磁盘操作的效率,设置过小可能增加管理开销。
  • log.roll.ms:设置日志段的滚动周期,控制日志文件分割的时间粒度。

3.6 性能优化与磁盘 I/O 配置

Kafka 作为一个高吞吐量的分布式消息系统,对于磁盘 I/O 和网络 I/O 的要求很高。合理配置 Kafka Broker 的磁盘 I/O 能提高消息的写入和读取速度,避免系统性能瓶颈。

关键配置:
# 设置消息的最小写入延迟
log.min.cleanable.dirty.ratio=0.5  # 控制消息清理的阈值

# 设置最大允许的线程数
num.io.threads=8     # I/O 线程数量

# 设置 Kafka 内存缓存大小(单位:MB)
log.buffer.size=10485760  # 默认为 10MB
  • log.min.cleanable.dirty.ratio:设置日志清理的阈值,避免过多无效日志占用空间。
  • num.io.threads:配置 I/O 线程数。根据机器性能,适当增加线程数以提高并发处理能力。
  • log.buffer.size:Kafka 写入消息时会使用缓冲区,适当增加缓冲区大小可以提高消息的写入性能。

3.7 Kafka 集群的监控与故障诊断

为了确保 Kafka Broker 的高可用性和稳定性,需要对 Kafka 集群进行实时监控。Kafka 提供了丰富的监控指标,可以通过 JMX 进行监控,也可以结合其他工具(如 Prometheus 和 Grafana)来实现集群健康检查和报警。

  • JMX 监控:Kafka 提供了大量的 JMX 指标,可以用于监控消息的吞吐量、延迟、磁盘使用情况等。
  • 日志监控:Kafka 的日志记录了大量的运行时信息,定期检查日志有助于提前发现潜在的问题。

4. 总结

通过合理配置和设计 Kafka 消息生产和消费的架构,可以有效地确保消息不丢失,并提高系统的可靠性和容错性。以下是关键的策略总结:

  1. 生产者配置

    • 使用 acks=all 确保消息写入到所有副本。
    • 启用消息幂等性和重试机制。
    • 使用合理的 retriesdelivery.timeout.ms 设置。
  2. 消费者配置

    • 使用手动提交消费位移,避免消息丢失。
    • 使用去重策略和幂等性消费,确保消息不会重复消费。
  3. Broker 配置

    • 配置合理的副本数和分区数,确保高可用性。
    • 使用 Zookeeper 确保 Kafka 集群的协调和管理。
    • 设置合理的日志清理策略,避免存储空间耗尽。
  4. 监控与报警

    • 使用 JMX 监控 Kafka 的运行状态。
    • 结合 Spring Boot Actuator 进行健康检查和性能监控。

http://www.kler.cn/a/517032.html

相关文章:

  • qt QNetworkRequest详解
  • ubuntu调用图形化网络测试工具
  • Windows cmd常用命令
  • macOS安装Gradle环境
  • Flink运行时架构
  • ubuntu如何测试网络性能
  • Qt中的connect函数
  • mysql学习笔记-数据库的设计规范
  • 在Qt中实现点击一个界面上的按钮弹窗到另一个界面
  • Xcode各个历史版本下载地址
  • 一文速通stack和queue的理解与使用
  • 根据条件更改el-tree的字体颜色
  • 【物联网】ARM核常用指令(详解):数据传送、计算、位运算、比较、跳转、内存访问、CPSR/SPSR、流水线及伪指令
  • Linux探秘坊-------4.进度条小程序
  • 基于微信小程序的汽车保养系统设计与实现(LW+源码+讲解)
  • 关于单通道串口服务器的详细讲解
  • uniapp APP端页面触发调用webview(页面为uniapp开发的H5)里的方法
  • 速通Docker === Docker Compose
  • WebAssembly视频检测在社区创作平台的落地与实践 | 得物技术
  • 设计模式的艺术-职责链模式
  • 解决npm install总是卡在sill idealTree buildDeps不动问题
  • 【java】签名验签防篡改研究测试
  • 解决Python 在 Flask 开发模式下定时任务启动两次的问题
  • C# OpenCV机器视觉:交通标志识别
  • 【Uniapp-Vue3】下拉刷新
  • 最新-CentOS 7 基于1 Panel面板安装 JumpServer 堡垒机