当前位置: 首页 > article >正文

RabbitMQ的消息可靠性保证

文章目录

    • 1.环境搭建
        • 1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
        • 2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
    • 2.生产者可靠性
        • 1.开启消息超时重试机制
        • 2.生产者开启ConfirmCallback消息确认机制
          • 1.application.yml
          • 2.TestConfigPublisher.java
          • 3.测试交换机名字写错的情况
    • 3.MQ可靠性
        • 1.使用LazyQueue和持久化队列结合的方式来做
    • 4.消费者可靠性
        • 1.消费者失败重试机制
          • 1.application.yml
          • 2.解释
        • 2.消费者消息失败处理策略
          • 1.ErrorConfiguration.java 指定错误消息发送到异常交换机
          • 2.ErrorListener.java 异常队列监听器
          • 3.ErrorMessageHandler.java 异常消息处理器
          • 4.TestConfig.java配置
          • 5.TestConfigPublisher.java 生产者
          • 6.TestConfigConsumer.java 消费者故意消费失败
          • 7.测试,消费失败则重试三次后到异常处理逻辑

1.环境搭建

1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
spring:
  rabbitmq:
    # 消费者配置
    listener:
      simple:
        prefetch: 1 # 每次获取一条消息,处理完再获取下一条
2.common-rabbitmq-starter-demo下创建一个生产者一个消费者

CleanShot 2024-12-31 at 21.59.36@2x

2.生产者可靠性

1.开启消息超时重试机制
    # 生产者消息重试配置
    template:
      retry:
        # 启用消息重试机制,默认为 false
        enabled: true
        # 初始重试间隔时间为一秒
        initial-interval: 1000ms
        # 重试最大次数,默认为 3 次
        max-attempts: 2
        # 重试的间隔倍数
        # 配置 2 的话,第一次等initial-interval也就是1s,第二次等 2s,第三次等 4s
        multiplier: 2
    connection-timeout: 500ms # 连接超时时间500ms
2.生产者开启ConfirmCallback消息确认机制
1.application.yml
    # 生产者配置
    publisher-confirm-type: correlated # 发布确认类型为异步回调(一旦配置了,就必须要有回调方法)
2.TestConfigPublisher.java
package com.sunxiansheng.publisher.pub;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * Description: 测试发布者
 *
 * @Author sun
 * @Create 2024/12/31 19:05
 * @Version 1.0
 */
@RestController
@Slf4j
public class TestConfigPublisher {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void send() {
        log.info("发送消息");
        // 1.创建CorrelationData对象
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        // 2.设置回调
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 基本不可能发生,因为这里的异常不是MQ问题导致的
                log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
                // 判断是否发送成功
                if (confirm.isAck()) {
                    log.info("ConfirmCallback:消息发送成功:{}", confirm);
                } else {
                    log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());
                }
            }
        });
        rabbitTemplate.convertAndSend("fanout.exchange.tesst", "", "hello rabbitmq", cd);
    }
}
3.测试交换机名字写错的情况

CleanShot 2024-12-31 at 19.57.56@2x

3.MQ可靠性

1.使用LazyQueue和持久化队列结合的方式来做
    /**
     * 创建一个队列
     *
     * @return
     */
    @Bean
    public Queue fanoutQueueTest() {
        return QueueBuilder.durable("lazyQueue") // 持久化队列
                .lazy()               // 惰性队列
                .build();
    }

持久化队列可以保存队列的元数据,重启后自动恢复,惰性队列可以将所有的消息都持久化到磁盘,内存只保留最近的2048条消息

4.消费者可靠性

1.消费者失败重试机制
1.application.yml
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: auto # 自动确认模式(消费者确认机制)
        retry:
          enabled: true # 开启重试机制
          max-attempts: 3 # 最大重试次数
          initial-interval: 1000ms # 重试间隔时间
          multiplier: 1.0 # 重试时间间隔倍数
          stateless: false # false:有状态,true:无状态,如果是有状态的,每次重试都会发送到同一个队列
2.解释

首先开启了消费者自动确认机制,如果消息消费失败,就进行重试

2.消费者消息失败处理策略
1.ErrorConfiguration.java 指定错误消息发送到异常交换机
package com.sunxiansheng.rabbitmq.error;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: 处理失败消息的交换机和队列
 *
 * @Author sun
 * @Create 2024/12/31 19:07
 * @Version 1.0
 */
@Configuration
// 当配置文件中spring.rabbitmq.listener.simple.retry.enabled=true时,才会生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple", name = "retry.enabled", havingValue = "true")
public class ErrorConfiguration {

    /**
     * 一个error交换机
     */
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.exchange");
    }

    /**
     * 一个error队列
     */
    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    /**
     * 绑定error队列到error交换机
     */
    @Bean
    public Binding errorBinding() {
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
    }

    /**
     * MessageRecoverer
     */
    @Bean
    public MessageRecoverer myMessageRecoverer(RabbitTemplate rabbitTemplate) {
        // 指定错误消息发送到error.exchange交换机,routingKey为error
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }
}
2.ErrorListener.java 异常队列监听器
package com.sunxiansheng.consumer.error;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: 错误消息监听器
 *
 * @Author sun
 * @Create 2024/12/31 20:32
 * @Version 1.0
 */
@Component
@Slf4j
public class ErrorListener {

    @RabbitListener(queues = "error.queue")
    public void errorListener(Message message) {
        // 解析错误信息
        ErrorMessageHandler.handleErrorMessage("error.queue", message);
    }
}
3.ErrorMessageHandler.java 异常消息处理器
package com.sunxiansheng.consumer.error;

import com.rabbitmq.client.LongString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
 * Description: 错误消息处理器
 *
 * @Author sun
 * @Create 2024/12/31 20:32
 * @Version 1.0
 */
@Slf4j
public class ErrorMessageHandler {

    public static void handleErrorMessage(String listenerName, Message message) {
        // 获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> headers = messageProperties.getHeaders();

        // 从消息头部获取异常信息
        String exceptionMessage = (String) headers.get("x-exception-message");
        String originalExchange = (String) headers.get("x-original-exchange");
        String originalRoutingKey = (String) headers.get("x-original-routingKey");

        // 处理LongString类型的异常堆栈跟踪信息
        String exceptionStackTrace = null;
        if (headers.containsKey("x-exception-stacktrace")) {
            Object stacktraceObject = headers.get("x-exception-stacktrace");
            if (stacktraceObject instanceof LongString) {
                exceptionStackTrace = stacktraceObject.toString();
            }
        }

        // 格式化输出所有信息,并在前后添加分割线
        log.error("\n-------------------------------\n" +
                        "MQ错误监听队列: {}\n" +
                        "原始交换机: {}\n" +
                        "原始路由键: {}\n" +
                        "原始信息: {}\n" +
                        "异常信息: {}\n" +
                        "异常堆栈: {}\n" +
                        "-------------------------------",
                listenerName, originalExchange, originalRoutingKey, messageBody, exceptionMessage, exceptionStackTrace);
    }
}
4.TestConfig.java配置
package com.sunxiansheng.publisher.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: 测试配置类
 *
 * @Author sun
 * @Create 2024/12/31 19:00
 * @Version 1.0
 */
@Configuration
public class TestConfig {

    /**
     * 创建一个fanout类型的交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange.test");
    }

    /**
     * 创建一个队列
     *
     * @return
     */
    @Bean
    public Queue fanoutQueueTest() {
        return QueueBuilder.durable("lazyQueue") // 持久化队列
                .lazy()               // 惰性队列
                .build();
    }

    /**
     * 交换机和队列绑定
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());
    }
}
5.TestConfigPublisher.java 生产者
package com.sunxiansheng.publisher.pub;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * Description: 测试发布者
 *
 * @Author sun
 * @Create 2024/12/31 19:05
 * @Version 1.0
 */
@RestController
@Slf4j
public class TestConfigPublisher {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void send() {
        log.info("发送消息");
        // 1.创建CorrelationData对象
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        // 2.设置回调
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 基本不可能发生,因为这里的异常不是MQ问题导致的
                log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
                // 判断是否发送成功
                if (confirm.isAck()) {
                    log.info("ConfirmCallback:消息发送成功:{}", confirm);
                } else {
                    log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());
                }
            }
        });
        rabbitTemplate.convertAndSend("fanout.exchange.test", "", "hello rabbitmq", cd);
    }
}
6.TestConfigConsumer.java 消费者故意消费失败
package com.sunxiansheng.consumer.con;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: 测试消费者
 *
 * @Author sun
 * @Create 2024/12/31 19:03
 * @Version 1.0
 */
@Component
@Slf4j
public class TestConfigConsumer {

    @RabbitListener(queues = "fanout.queue.test")
    public void receive(String message) {
        log.info("接收到的消息:{}", message);
        int i = 1 / 0;
    }
}
7.测试,消费失败则重试三次后到异常处理逻辑

CleanShot 2024-12-31 at 22.07.15@2x


http://www.kler.cn/a/514874.html

相关文章:

  • Codeforces Round 1000 (Div. 2)(前三题)
  • linux网络 | 传输层TCP | 认识tcp报头字段与分离
  • 深入剖析 Java 的本地方法接口(JNI)
  • c++ 与 Matlab 程序的数据比对
  • Element使用表单重置如果不使用prop,重置无法生效
  • Linux Bash 中使用重定向运算符的 5 种方法
  • 网络(一)
  • C语言程序环境与预处理—从源文件到执行程序,这里面有怎么的工序?绝对0基础!
  • 【 MySQL 学习4】排序
  • Kafka 源码分析(一) 日志段
  • java中的String类、StringBuffer类、StringBuilder类的详细讲解(包含相互之间的比较)
  • BUG解决:安装问题transformer_engine+pytorch
  • 基于springboot+vue的高校社团管理系统的设计与实现
  • docker ubuntu:20.04构建c++ grpc环境
  • es的date类型字段按照原生格式进行分组聚合
  • QILSTE H13-320B2W高亮白光LED灯珠 发光二极管LED
  • 如何使用CRM数据分析和洞察来支持业务决策和市场营销?
  • 开源鸿蒙开发者社区记录
  • 深入了解 Java split() 方法:分割字符串的利器
  • AI时代的网络安全:传统技术的落寞与新机遇
  • Kubernetes入门学习
  • Spring Boot 事件驱动:构建灵活可扩展的应用
  • PostgreSQL 初中级认证可以一起学吗?
  • Servlet快速入门
  • Spring Boot MyBatis Plus 版本兼容问题(记录)
  • HOW - 基于master的a分支和基于a的b分支合流问题