当前位置: 首页 > article >正文

RabbitMQ 发布确认高级部分

RabbitMQ 发布确认高级部分

1. 什么是发布确认(Publisher Confirms)

发布确认是 RabbitMQ 的一种机制,确保消息成功发送到服务器并持久化。在与 Spring Boot 结合时,可以增强消息的可靠性和监控能力。

2. Spring Boot 中的 RabbitMQ 配置

2.1 添加依赖

pom.xml 中添加 RabbitMQ 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置 RabbitMQ

application.yml 中配置 RabbitMQ 的连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3. 启用发布确认

3.1 创建 RabbitTemplate Bean

在 Spring Boot 中,通过配置 RabbitTemplate 来启用发布确认:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 启用发布确认
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功,消息 ID: " + correlationData.getId());
            } else {
                System.err.println("消息发送失败,原因: " + cause);
            }
        });
        return rabbitTemplate;
    }
}

3.2 发送消息

使用 RabbitTemplate 发送消息并自动处理确认:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("my_exchange", "my_routing_key", message);
        System.out.println("发送消息: " + message);
    }
}

4. 回调函数

4.1 ConfirmCallback

setConfirmCallback 方法设置了确认回调函数。确认成功时会触发此回调,可以根据 ack 参数判断消息是否发送成功。

4.2 处理失败的消息

在确认失败时,可以实现自定义处理逻辑,如重试或记录失败信息:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        System.err.println("消息发送失败,原因: " + cause);
        // 可以实现重试逻辑
    }
});

5. 备份交换机(Dead Letter Exchange)

5.1 什么是备份交换机

备份交换机(Dead Letter Exchange, DLX)是用来处理未能成功消费的消息的机制。当消息在队列中达到最大重试次数或过期时,它会被转发到备份交换机。

5.2 配置备份交换机

在 RabbitMQ 中配置备份交换机的步骤:

  1. 定义备份交换机和队列
@Bean
public Queue myQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "dlx_exchange"); // 备份交换机
    args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 备份路由键
    return new Queue("my_queue", true, false, false, args);
}

@Bean
public TopicExchange dlxExchange() {
    return new TopicExchange("dlx_exchange");
}

@Bean
public Queue dlxQueue() {
    return new Queue("dlx_queue");
}

@Bean
public Binding dlxBinding() {
    return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");
}

5.3 消费失败的消息

在消费者中处理从备份交换机接收到的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DlxMessageListener {

    @RabbitListener(queues = "dlx_queue")
    public void handleDlxMessage(String message) {
        System.out.println("处理失败消息: " + message);
        // 处理逻辑,比如记录日志或重试
    }
}

6. 性能优化建议

6.1 批量确认

为了提高性能,可以考虑使用批量消息确认。发送多条消息后再确认,减少网络延迟。

public void sendBatchMessages(int count) {
    for (int i = 0; i < count; i++) {
        String message = "Batch Message " + i;
        rabbitTemplate.convertAndSend("my_exchange", "my_routing_key", message);
    }
    // 批量确认逻辑
}

6.2 调整预取计数

通过调整消费者的预取计数(prefetch count)来优化消息处理性能。设置合理的预取计数可以减少消费者的负载。

@RabbitListener(queues = "my_queue", containerFactory = "rabbitListenerContainerFactory")
public void processMessage(String message) {
    // 处理逻辑
}

6.3 连接池

使用连接池(如 C3P0HikariCP)管理 RabbitMQ 的连接,减少连接的创建和销毁开销。

6.4 监控和日志

使用监控工具(如 Spring Boot Actuator)监控 RabbitMQ 的性能和消息流动情况。记录发送和接收的消息状态,以便进行故障排查。

7. 项目地址

你可以访问以下项目地址,获取完整示例代码和更多实现细节:

GitHub 项目地址


http://www.kler.cn/news/366917.html

相关文章:

  • Educational Codeforces Round 170 C New Game
  • node集成redis (教学)
  • vscode配色主题与图标库推荐
  • AI智能爆发:从自动驾驶到智能家居,科技如何改变我们的日常?
  • 基于Python大数据的王者荣耀战队数据分析及可视化系统
  • 低代码平台如何通过AI赋能,实现更智能的业务自动化?
  • 语音交互:重塑人机对话的未来
  • 【Nas】X-Doc:jellyfin“该客户端与媒体不兼容,服务器未发送兼容的媒体格式”问题解决方案
  • 量子计算突破:下一个科技革命的风口浪尖在哪里?
  • Spring Boot 集成 PDFBox 实现PDF电子签章的简单应用
  • AI大模型开发架构设计(16)——ChatGPT Code Interpreter应用场景和技术原理动手实践
  • 【Python爬虫实战】Selenium自动化网页操作入门指南
  • 数据结构------手撕链表(一)【不带头单向非循环】
  • 掌握预测的准确性——使用 VAEneu 和 CRPS 的概率方法
  • PMP–一、二、三模–分类–11.风险管理–机会风险应对策略
  • 【C++笔记】内存管理
  • 实现简道云与企业微信的自动化数据集成
  • [C#][winform]基于yolov8的道路交通事故检测系统C#源码+onnx模型+评估指标曲线+精美GUI界面
  • Java-图书管理系统
  • LeetCode105. 从前序与中序遍历序列构造二叉树(2024秋季每日一题 49)
  • 地磁传感器(学习笔记上)
  • 微信小程序文字转语音播报案例
  • 基于Java SpringBoot和Vue社区医院诊所医疗挂号管理系统设计
  • 【超大数据】数字的拆分——int128数据类型的使用方法
  • Spring Cloud微服务:构建现代应用的新基石
  • 【笔记】软件测试09——接口测试