当前位置: 首页 > article >正文

Kafka 面试题

参考:

  1. https://javabetter.cn/interview/kafka-40.html
  2. https://javaguide.cn/high-performance/message-queue/kafka-questions-01.html

Kafka 架构

在这里插入图片描述

名词概念

  1. Producer(生产者) : 产生消息的一方。

  2. Consumer(消费者) : 消费消息的一方。

  3. Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。

    1. Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。同一 Topic 下的 Partition 可以分布在不同的 Broker 上。

    2. Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition 。

      Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列

      img

zookeeper在 Kafka 中的作用?

  1. Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。

  2. Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。

    比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0/brokers/topics/my-topic/Partitions/1

  3. 负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition,各个 Partition 可以分布在不同的 Broker(Kafka实例)上**。对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上**。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

Kafka 中可以不要 zookeeper 吗?

答:可以,在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。

Kafka 的消息模型?知道队列模型吗?

1、早期的消息模型:队列模型。使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。存在的问题:

  • 不能将一条消息发送给多个消费者。

队列模型

2、Kafka 消息模型:发布-订阅模式。这种模式可以让一条消息发送给多个消费者。

  • 发布订阅模型(Pub-Sub) 使用主题(Topic) 作为消息通信载体,类似于广播模式
  • 发布者发布一条消息,该消息通过主题传递给所有的订阅者。(在一条消息广播之后才订阅的用户则是收不到该条消息的)

发布订阅模型

Kafka 多副本机制

Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。

  • 我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
  • 生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

Kafka 用途

Kafka 是一个分布式流式处理平台。流平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。

  2. 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。

  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库,可以实时处理数据。

    例如,一个传感器的数据被消费者读取后可以进行处理。比如:,可以进行过滤、转换、聚合、窗口操作等。在检测异常情况的场景中,可能会对传感器数据进行实时分析,比如检查温度是否超过某个阈值、压力是否突然变化等。

Kafka 优势(与 其他消息中间件对比)

Kafka 与 RocketMQ、RabbitMQ 对比。Kafka 具有哪些优势:

  1. 极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
  2. 生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。

Kafka 为什么要把消息分区(partition)?如何分区?

答:

1、把消息分区有利于:

  • 提高吞吐量:一个Topic可以有多个Partition组成,那么就可以实现并行处理,提高吞吐量。多个消费者可同时处理不同分区的消息。

  • 增强容错性:一个分区故障不影响其他分区,且分区有副本可保证数据可用性。

  • 便于实现负载均衡。可根据消费者数量自动分配分区,确保处理均衡。

    当有多个消费者同时订阅一个主题时,Kafka 会根据消费者的数量自动分配主题的分区给这些消费者。

2、如何分区的:

  • 指明 Partition 的情况:如果生产者在发送消息时明确指定了 Partition 值,那么 Kafka 就直接将该指定的值作为消息要发送到的 Partition。

  • 有 key 但无指定 Partition 的情况:当没有明确指定 Partition 但消息有 key 时,Kafka 会计算 key 的哈希值,然后与该Topic 的分区数量取余,得到的余数就是要放入分区的ID。

    这样可以确保具有相同 key 的消息被分配到同一个 Partition,方便后续的处理和消费。

  • 既无 Partition 也无 key 的情况:在既没有指定 Partition 也没有 key 的情况下,第一次发送消息时,Kafka 会随机生成一个整数。之后每次发送消息,在这个整数上自增,然后将这个值与主题可用的 Partition 总数取余,得到消息要被发送到的 Partition。

    这种方式类似于轮询(round-robin)算法,确保消息在各个 Partition 之间相对均衡地分布。

Kafka 如何保证消息的消费顺序?

答:

具体的分析:Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。每次添加消息到 Partition(分区) 的时候都会采用尾加法,如下图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

img

因此,常用的 Kafka 保证消息消费的顺序,有两种方式:

  1. 1 个 Topic 只对应一个 Partition。(但是这种办法效率不高)

  2. (推荐)发送消息的时候指定 key/Partition:Kafka 中发送 1 条消息的时候,可以指定 topic, partitionkeydata(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition。

    例如,在一个电商系统中,如果以订单 ID 作为 “key”,那么所有与同一个订单相关的操作消息(如订单创建、支付、发货等)在发送时都带有相同的订单 ID 这个 “key”。根据 Kafka 的规则,这些具有相同 “key” 的消息会被分配到同一个分区。

Kafka 如何发现消息丢失?

这需要根据消息丢失的位置,具体分析:

  • 生产者端消息丢失
  • 消费者端消息丢失
  • Kafka(broker)服务之间消息同步时,消息丢失

生产者丢失消息、如何知道是否丢失?

丢失情况:生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。

得知消息丢失:通过send()方法返回值查看消息是否发送成功。

  1. 方式1:同步方式,生产者将消息发送出去之后,需要等待消息发送的结果返回。

    // 使用get()方法,是同步的方式
    SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
    if (sendResult.getRecordMetadata() != null) {
      logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
                  sult.getProducerRecord().value().toString());
    }
    
  2. 方式2:异步回调的方式。可以采用异步回调的方式,获取消息发送的结果,从而得知消息是否丢失。

    // send()方法默认的是异步方式        
    ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
            future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
                    ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
    
生产者端尽量避免消息丢失的方法

设置重试次数、重试间隔(注意重试机制 生产者、消费者都可以用)。这是一种尽可能的避免消息丢失的策略。即当消息发送失败时,可以多重试几次,增加发送成功的几率。

具体方法:

  • 为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。
  • 另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your_kafka_bootstrap_servers");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置重试次数和重试间隔
        RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(10);
        retryTemplate.setRetryPolicy(retryPolicy);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000); // 重试间隔为 1 秒
        retryTemplate.setBackOffPolicy(backOffPolicy);

        configProps.put(ProducerConfig.RETRIES_CONFIG, retryTemplate);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消费者丢失消息的情况,如何知道丢失?

消费者端丢失消息的情况:当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。此时会有问题,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

得知消息丢失的办法:手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset

弊端:这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

Kafka服务端消息丢失,如何避免丢失?

Kafka 为分区(Partition)引入了多副本(Replica)机制。假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失

如何避免丢失?这需要通过以下参数共同起作用:

  1. acks:生产者发送消息后要求的确认机制级别。

    1. acks=0:产者发送消息后,不需要等待任何来自服务器的响应。种设置下,生产者发送消息的速度最快,但消息可能会在网络传输等过程中丢失,因为没有任何确认机制来保证消息已经被成功接收和存储。

    2. acks=1:生产者发送消息后,只要集群中的 leader 成功接收到消息,就会向生产者发送确认响应,而不会管follower 是否接收成功。

    3. acks=all(也可以用acks=-1表示):生产者发送消息后,需要等待所有的同步副本(In-Sync Replicas,ISR)都成功接收到消息后,才会收到来自服务器的确认响应。

      也就是 该分区的 leader、和与leader数据时同步的followers 都接收消息成功。

  2. replication.factor >= 3:指定了一个主题(topic)的副本因子数量,即一个分区(partition)有多少个副本(这里设置为每个分区有一个 leader 副本和3个 follower 副本)。虽然造成了数据冗余,但是带来了数据的安全性

  3. min.insync.replicas > 1:指定了为了被认为是 “已同步” 的,一个分区(partition)中最少需要保持同步状态的副本数量。这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。

  4. unclean.leader.election.enable=false:控制是否允许非同步副本(不在 ISR 中的副本,即与leader数据时同步的followers)成为 leader。当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

Kafka 如何保证消息不重复消费?

参考:https://www.cnblogs.com/yangyongjie/p/14675119.html

先说为什么 Kafka 会重复消费消息?

答:主要有如下情况

  1. 消费者宕机、重启等。导致消息已经消费但是没有提交offset。

  2. 消费者在处理耗时的任务,处理时间超过Kafka服务器设置的max.poll.interval.ms时间,Kafka 会误认为这个消费者死掉,会触发再均衡(rebalance)。(在重平衡过程中,其他消费者可能会被分配到之前由该问题消费者处理的分区,而由于偏移量未正确处理,可能会导致重复消费之前已经处理过的消息)

    max.poll.interval.ms参数用于控制消费者在两次调用poll()方法之间的最大时间间隔。超过这个间隔时间这个消费者就会被认为挂掉了。

解决办法:

  1. 幂等校验:消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。【推荐】

    比如,消费者在本地存储(如数据库、缓存等)中检查(比如通过订单ID)是否已经处理过具有相同订单号的消息,处理过就不会再次处理了。

  2. 关闭自动提交 offset,改为手动提交 offset。但这种方式可能会有两个问题:

    1. 如果是处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
    2. 如果是拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底

Kafka 的重试机制?

Kafka 的重试机制

Kafka 的重试机制既可以针对生产者,也可以针对消费者。

  • 当生产者发送消息到 Kafka 集群时,如果出现网络问题、Broker 故障等情况导致消息发送失败,生产者可以配置重试机制来自动重新发送消息,以提高消息发送的成功率。
  • 消费者在消费消息时,如果处理消息的过程中出现异常,也可以通过一定的方式进行重试,以确保消息能够被正确处理。

重试机制默认最多重试10次,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。当然也可以自定义重试次数、重试间隔。

重试失败后,如何告警?

答:自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandlerhandleRemaining 函数,加上自定义的告警等操作。

@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {

    public DelErrorHandler(FixedBackOff backOff) {
        super(null,backOff);
    }

    @Override
    public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        super.handleRemaining(thrownException, records, consumer, container);
        log.info("重试多次失败");
        // 自定义操作
    }
}

重试失败后的数据如何再次处理?-- 死信队列

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

答:当 Kafka 中的数据重试失败后,可以使用**死信队列(Dead Letter Queue,DLQ)**来进行再次处理。

  • 死信队列(Dead Letter Queue,DLQ):未被正常消费的消息 会被放入死信队列中,在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
  • 在实际使用过程中,可以使用@DltHandler 注解处理。

消费者消息消费时,失败之后会怎么样?

在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。

因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。


http://www.kler.cn/a/321181.html

相关文章:

  • 远程服务器没办法显示图片
  • Linux shell脚本用于常见图片png、jpg、jpeg、webp、tiff格式批量转PDF文件
  • 工作编码案例--UDP多播 和 本地套接字bind
  • 怎么把多个PDF合并到一起-免费实用PDF编辑处理工具分享
  • 论文解读 | EMNLP2024 一种用于大语言模型版本更新的学习率路径切换训练范式
  • 面试经典 150 题——数组/字符串(一)
  • 一个证明-待验证
  • 平衡、软技能与持续学习
  • pdf编辑转换器怎么用?分享9个pdf编辑、转换方法(纯干货)
  • 基于深度学习的药品三期OCR字符识别
  • 生成式语言模型底层技术面试
  • 修改Docker默认存储路径,解决系统盘占用90%+问题(修改docker root dir)
  • 【笔记】数据结构|链表算法总结|快慢指针场景和解决方案|链表归并算法和插入算法|2012 42
  • 共享单车轨迹数据分析:以厦门市共享单车数据为例(八)
  • 爬虫过程 | 蜘蛛程序爬取数据流程(初学者适用)
  • P335_0334韩顺平Java_零钱通介绍
  • 华为NAT ALG技术的实现
  • AttributeError: ‘Sequential‘ object has no attribute ‘predict_classes‘如何解决
  • 【Python报错已解决】ModuleNotFoundError: No module named ‘psutil’
  • Android——运行时动态申请权限
  • [Redis][Hash]详细讲解
  • 828华为云征文 | 在华为云X实例上部署微服务架构的文物大数据管理平台的实践
  • linux命令:显示已安装在linux内核的模块的详细信息的工具modinfo详解
  • 物理学基础精解【7】
  • Docker 容器技术:颠覆传统,重塑软件世界的新势力
  • 【RAG研究1】导言-我打算如何对RAG进行全面且深入的研究