当前位置: 首页 > article >正文

Kafka面试题----如何保证Kafka消费者在消费过程中不丢失消息

合理配置消费者参数

  • enable.auto.commit:设置为 false,关闭自动提交偏移量。自动提交偏移量存在一定的时间间隔,在这个间隔内如果消费者出现异常,可能会导致部分消息被重复消费或者丢失。关闭自动提交后,由开发者手动控制偏移量的提交,确保消息处理完成后再提交偏移量。
enable.auto.commit=false
  • auto.offset.reset:根据业务需求合理设置该参数。该参数指定当消费者没有有效的偏移量时(例如,消费者首次启动或者偏移量已过期),从何处开始消费消息。

    • earliest:从分区的起始位置开始消费消息,确保不会遗漏任何消息,但可能会导致重复消费。
    • latest:从分区的最新位置开始消费消息,会忽略分区中已有的旧消息。如果需要处理所有消息,不建议使用该值。
auto.offset.reset=earliest

正确处理消息和提交偏移量

  • 手动提交偏移量:在消息处理完成后,手动提交偏移量。可以选择同步提交(commitSync())或异步提交(commitAsync())。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ManualOffsetCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交偏移量
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}
  • 批量处理和批量提交:如果消息处理速度较慢,可以采用批量处理的方式,处理完一批消息后再统一提交偏移量。但要注意,批量处理过程中如果出现异常,可能需要回滚并重新处理这批消息。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class BatchOffsetCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    // 记录每个分区的最新偏移量
                    offsetsToCommit.put(partition, new OffsetAndMetadata(record.offset() + 1));
                }
                if (!offsetsToCommit.isEmpty()) {
                    // 同步提交偏移量
                    consumer.commitSync(offsetsToCommit);
                }
            }
        } finally {
            consumer.close();
        }
    }
}

处理消费者异常和故障恢复

  • 异常处理:在消息处理过程中,捕获并处理可能出现的异常,确保在异常情况下不会丢失消息。例如,如果处理消息时出现数据库写入失败的异常,可以进行重试或者记录日志,待问题解决后重新处理。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ExceptionHandlingConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 处理消息
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    } catch (Exception e) {
                        // 记录异常日志
                        System.err.println("Error processing message: " + e.getMessage());
                        // 可以进行重试或者其他处理
                    }
                }
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}
  • 故障恢复:当消费者发生故障重启时,要确保能够从上次处理的位置继续消费消息。由于关闭了自动提交偏移量,并且手动控制偏移量的提交,消费者重启后会从上次提交的偏移量位置开始消费,从而避免消息丢失。

监控和日志记录

  • 监控消费者状态:使用 Kafka 提供的监控工具或者第三方监控系统,实时监控消费者的状态,包括消费偏移量、消费速率等。及时发现异常情况并进行处理。
  • 日志记录:详细记录消息处理过程中的关键信息,如消息的偏移量、处理结果、异常信息等。当出现问题时,可以通过日志进行排查和恢复。

http://www.kler.cn/a/560772.html

相关文章:

  • 深入理解 Kafka 主题分区机制
  • 基于 Python 和 Django 的文本情感分析系统设计与实现
  • 计算机毕业设计SpringBoot+Vue.js网上租赁系统(源码+文档+PPT+讲解)
  • 微相E316实现FM电台监听
  • DirectX12(D3D12)基础教程三 线性代数与3D世界空间
  • istio介绍补充以及使用篇
  • Python常见面试题的详解21
  • 【前端开发】能不能用Vue+Bootstrap进行项目开发?有什么需求场景需要用到的地方
  • 一周学会Flask3 Python Web开发-Jinja2模板访问对象
  • 云原生时代的分布式文件系统设计与实现
  • 如何查看PostgreSQL的版本
  • Macos ./ollama目录说明
  • overflow-x: auto 使用鼠标实现横向滚动,区分触摸板和鼠标滚动事件的方法
  • angular简易计算器
  • MybatisPlus-扩展功能-枚举处理器
  • Linux-SaltStack基础
  • 【复习】Redis
  • 1.2 redis7.0.4安装与配置开机自启动
  • cpp的stl二分查找库函数
  • 蓝桥杯备赛-精卫填海-DP