当前位置: 首页 > article >正文

Kafka 的延迟队列、死信队列和重试队列

总结一下实现的方法:
1、延迟队列,首先kafka是没有延迟队列的,那要实现延迟队列的话,就得使用其他方法。在发送消息的时候加上时间戳,再在时间戳上面加上延迟时间。消费的时候判断一下,有没有到达延迟时间,如果没有到达的话,重新入队,或启用定时线程处理。
2、重试队列,使用@RetryableTopic注解
3、死信队列,使用@DltHandler 或 @KafkaListener监听死信队列

代码非完整代码,仅供参考

1. 添加依赖

确保你的 pom.xml 文件中包含 Spring Kafka 的依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-kafka</artifactId>
    </dependency>
</dependencies>

2. 配置 Kafka

application.properties 文件中配置 Kafka 的连接信息和消费者的基本配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false

3. 创建 Kafka 生产者

创建一个 Kafka 生产者服务,用于发送消息到指定的 Topic:

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.util.Date;

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送延迟消息到指定的 Topic。
     * @param topic 目标 Topic 名称(延迟队列为delay-topic,其他为my-topic)
     * @param message 要发送的消息内容
     * @param delay 延迟时间(毫秒)
     */
    public void sendDelayedMessage(String topic, String message, long delay) {
        long timestamp = Instant.now().toEpochMilli() + delay;
        kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));
    }
}

4. 创建 Kafka 消费者

4.1 消费延迟队列的消费者
@Service
public class KafkaConsumerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "delay-topic", groupId = "delay-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        long currentTimestamp = System.currentTimeMillis();
        long messageTimestamp = record.timestamp();

        // 检查是否到达延迟时间
        if (currentTimestamp < messageTimestamp) {
            // 未到达延迟时间,重新发送到延迟队列
            long remainingDelay = messageTimestamp - currentTimestamp;
            sendDelayedMessage(record.topic(), record.value(), remainingDelay);
        } else {
            // 到达延迟时间,处理消息
            System.out.println("Processing message: " + record.value());
        }

        // 确认消息已处理
        acknowledgment.acknowledge();
    }

    private void sendDelayedMessage(String topic, String message, long delay) {
        long timestamp = System.currentTimeMillis() + delay;
        kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));
    }
}
4.2 消费重试队列,失败放入死信队列

创建一个 Kafka 消费者服务,用于监听指定的 Topic 并处理消息。使用 @KafkaListener 注解来指定监听的 Topic,并手动提交偏移量。

package com.example.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;

import java.time.Instant;

@Service
public class KafkaConsumer {

    /**
     * 监听指定的 Topic 并处理消息。
     * 使用 @RetryableTopic 注解实现重试机制,最多尝试 3 次,每次重试间隔 2 秒,最大延迟 60 秒。
     * 如果所有重试都失败,消息将发送到死信队列。
     *
     * @param record 消费的消息记录
     * @param acknowledgment 用于手动提交偏移量
     */
    @RetryableTopic(
            attempts = "3", // 最大重试次数
            backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000), // 重试间隔和最大延迟
            dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR, // 失败后发送到死信队列
            autoCreateTopics = "true" // 自动创建重试和死信队列主题
    )
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            System.out.println("Received message: " + record.value());
            // 模拟异常
            if (shouldFail()) {
                throw new RuntimeException("Simulated failure");
            }
            acknowledgment.acknowledge(); // 提交偏移量
        } catch (Exception e) {
            throw e; // 抛出异常,触发重试机制
        }
    }

    /**
     * 模拟处理失败的条件。
     * @return 是否模拟失败
     */
    private boolean shouldFail() {
        // 模拟处理失败的条件
        return true;
    }

    /**
     * @DltHandler 注解标记的方法用于处理死信队列中的消息。
     * 当消息在重试队列中多次重试失败后,会被发送到死信队列。
     * @DltHandler 注解的方法会监听死信队列,并对其中的消息进行处理。
     * @DltHandler 它与 @RetryableTopic 注解结合使用,用于处理重试失败后的死信消息。
     * 
     * 处理死信队列中的消息。
     * @param record 死信队列中的消息记录
     */
    @DltHandler
    public void dltListen(ConsumerRecord<String, String> record) {
        String topic = record.topic(); // 获取死信队列的主题名称
        System.out.println("Received message in DLT: " + record.value());
        System.out.println("Topic: " + topic); // 打印主题名称
        // 处理死信消息, 可以在这里添加对死信消息的处理逻辑
    }
}

5. 配置 Kafka 消费者工厂

在 Spring Boot 中,可以通过配置 ConcurrentKafkaListenerContainerFactory 来设置重试机制和死信队列处理策略。
@RetryableTopic 和 SeekToCurrentErrorHandler 的配置不会同时生效。Spring Kafka 会优先处理 @RetryableTopic 注解的配置,因为它是一个更高级的抽象,专门用于处理重试和死信队列的逻辑。
为了避免配置冲突,建议选择一种方式来实现重试和死信队列的逻辑。如果你选择使用 @RetryableTopic,则不需要再配置 SeekToCurrentErrorHandler,即这里就可以跳过。

package com.example.demo;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // 设置错误处理器,最多重试 3 次,失败后发送到死信队列
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
        return factory;
    }
}

6. 创建死信队列消费者

创建一个消费者来监听死信队列主题,对死信消息进行后续处理(配置了@DltHandler 可以不用 @KafkaListener)。

package com.example.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class DltConsumer {

    /**
     * 监听死信队列主题并处理消息。
     * @param record 死信队列中的消息记录
     */
    @KafkaListener(topics = "my-topic.DLT", groupId = "dlt-group")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received message in DLT: " + record.value());
        // 可以在这里添加对死信消息的处理逻辑
    }
}

6.1 @DltHandler 与 @KafkaListener 的区别和适用场景
6.1.1 @DltHandler 的特点
与重试机制紧密结合:@DltHandler 注解的方法与 @RetryableTopic 注解的重试机制紧密结合,自动处理重试失败的消息。
自动发送到死信队列:当消息在重试队列中多次重试失败后,Spring Kafka 会自动将消息发送到死信队列。
简化代码:使用 @DltHandler 注解可以简化代码,减少手动处理死信消息的逻辑。
6.1.2 @KafkaListener 的特点
通用性:@KafkaListener 注解适用于任何 Kafka 主题,包括死信队列主题。
灵活性:可以用于监听任何主题,而不仅仅是死信队列。这使得它更加灵活,可以用于多种场景。
手动处理:需要手动配置死信队列主题,并在代码中显式处理死信消息。
6.2. @DltHandler 与 @KafkaListener 总结
**使用 @DltHandler:**如果你需要与 Spring Kafka 的重试机制紧密结合,并且希望自动处理重试失败的消息,使用 @DltHandler 是一个更简洁和方便的选择。
**使用 @KafkaListener:**如果你需要监听多个主题,或者需要更灵活地处理死信消息,使用 @KafkaListener 是一个更好的选择。
注意:如果 @KafkaListener 监听了死信队列的主题(例如 my-topic.DLT),那么当消息被发送到死信队列时,@KafkaListener 会先捕获并处理这些消息。这可能导致 @DltHandler 方法无法接收到死信队列中的消息。因此,两个最好不要一块用。

7. 启动类

创建一个 Spring Boot 应用程序的启动类:

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

总结

通过以上步骤,你可以在 Spring Boot 中实现 Kafka 的延迟队列、死信队列和重试队列。这些功能可以确保消息处理的可靠性和健壮性,避免消息丢失或重复处理。希望这些示例能帮助你更好地理解和使用 Kafka 的高级特性。


http://www.kler.cn/a/612082.html

相关文章:

  • Android设计模式之观察者模式
  • Android 项目缓存问题,某些依赖中的类会报错:Cannot resolve symbol
  • 若依专题——基础应用篇
  • Scala简介与基础
  • 远程办公新体验:用触屏手机流畅操作电脑桌面
  • SpringBoot动态配置数据源的几种实现方式
  • Spring事务与数据库事务的关系
  • 常见邮件协议
  • Oracle Database In-Memory 23ai 新特性
  • 【C++接入大模型】WinHTTP类封装:实现对话式大模型接口访问
  • 适合DBA的brew上手指南
  • (C语言)网络编程之TCP(含三次握手和四次挥手详解)
  • 适配器模式及其典型应用
  • Vue-create-vue-开发流程-项目结构-API风格-ElementPlus-入门准备工作
  • 【保姆级别教程】VMware虚拟机安装Mac OS15苹果系统附带【VMware TooLS安装】【解锁虚拟机】和【Mac OS与真机共享文件夹】手把手教程
  • 分布式共识算法解密:从Paxos到Raft的演进之路
  • 使用string和string_view(一)——C风格字符串、字符串字面量和std::string
  • 批量将 PDF 转换为 Word/PPT/Txt/Jpg图片等其它格式
  • 开发DOM更新算法
  • [python]基于yolov8实现热力图可视化支持图像视频和摄像头检测