当前位置: 首页 > article >正文

RabbitMQ高可用

生产者确认

生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。

1、在发送消息服务的application.yml中添加配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 异步回调
    publisher-returns: true
    template:
      mandatory: true

2、确保消息到交换机

package cn.zsh.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirmCallBack() {
        // 1、定义消息
        String message = "ABC";

        // 设置一个消息的唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 3、confirm-ack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送异常:" + ex.toString());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                    // 说明到了交换机
                    System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());
                } else {
                    // 消息没有到交换机
                    System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());
                }
            }
        });

        // 4、消息发送
        rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);
    }


}

3、确保消息从交换机路由到队列

创建公开CommonConfig类

package cn.zsh.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * 发送消息到交换机没有到消息队列
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 1、获取RabbitTemplate(获取启动中的Bean的方式)
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 2、设置回调函数
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);
            }
        });
    }
}

消息持久化

消息持久化就是:确保消息不会在交换机或者队列中丢失。

案例:

使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的

package cn.zsh.mq.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 创建交换机与队列
 */
@Component
public class FoundQueue {

    @Bean
    public DirectExchange qiuExchange(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new DirectExchange("qiu.deirect",true,false);
    }

    @Bean
    public Queue piqiuQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("piqiu.queue").build();
    }
}

消费者确认

消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。

方案一:消费者确认

加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。

默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # none:投递完立马删除  auto:失败后让你再次重试(重新投递到队列)知道成功

方案二:消费者失败重试,重试固定次数后,则删除当前消息

加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列

加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。

1、在消费者(消息接收者)中加入配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

2、创建error交换机和队列并绑定

package cn.zsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ErrorConfig {

    /**
     * 定义交换机
     * @return
     */
    @Bean
    public DirectExchange errorExchange2(){
        return new DirectExchange("error.direct");
    }

    /**
     * 定义队列
     * @return
     */
    @Bean
    public Queue errorQueue2(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){
        return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");
    }
}

3、在启动类或者配置类中加入配置

package cn.zsh.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
    }
}


http://www.kler.cn/a/401793.html

相关文章:

  • 《Python浪漫的烟花表白特效》
  • 【动手做】安装Miniconda和jupyter notebook环境实现线性回归
  • Conda 安装纯净版ComfyUI
  • 使用EventLog Analyzer日志分析工具监测 Windows Server 安全威胁
  • 【WPF】Prism学习(五)
  • 无人机航测技术算法概述!
  • ubuntu20.04的arduino+MU编辑器安装教程
  • C++代码优化(五):虚函数的开销和优化方式
  • 初始Python篇(6)—— 字符串
  • 人工智能学习——前言
  • 2024年第十四届APMCM亚太杯数学建模A题B题C题思路+代码解析汇总
  • MATLAB用到的矩阵基础知识(矩阵的乘和矩阵的逆)
  • Axure9生成的阅览页面如何自动展开左侧页面导航?
  • CSS基础也要进行模电实验
  • JSONP处理跨域请求
  • 每日一练:【动态规划算法】斐波那契数列模型之第 N 个泰波那契数(easy)
  • 【白话机器学习系列】白话 Softmax
  • 自动驾驶系统研发系列—智能驾驶新高度:解析ESS驾驶员转向辅助系统
  • C++ STL中常见的容器
  • 面向FWA市场!移远通信高性能5G-A模组RG650V-NA通过北美两大重要运营商认证