当前位置: 首页 > article >正文

RabbitMQ基本使用以及整合Java项目

RabbitMQ安装

此步骤可以参考CSDN上其他博文,有写得很详细的,此处不做过多安装问题,主要讲述怎么使用。

项目整合

导入依赖

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.0</version>
        </dependency>

配置好yml文件

rabbitmq:
  host: 192.168.85.xxx  #自己的地址
  port: 5673   # linux 对外开放端口是5673:5672
  username: admin
  password: admin
  virtual-host: /

基本使用

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ImgApp.class)
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void testSendMq(){
        //队列名称
        String que="test.queue";
        //消息
        String msg="hello,test";

        rabbitTemplate.convertAndSend(que,msg);
    }
}
@Component
public class MqListener {

    @RabbitListener(queues = "test.queue")
    public void listenerQueue(String msg){
        System.out.println("消费者收到消息:"+msg);

    }
}

可以看到跨服务接受到消息

MQ的三种交换机

  1. Direct 交换机
    • Direct 交换机是最简单的一种交换机类型。它将消息路由到绑定键(Binding Key)与消息的路由键(Routing Key)完全匹配的队列中。如果绑定键与消息的路由键相匹配,消息将被路由到对应的队列中。
    • Direct 交换机通常用于一对一的消息传递模式,适合处理特定类型的消息。
  1. Fanout 交换机
    • Fanout 交换机会将接收到的消息广播到所有与之绑定的队列中,忽略消息的路由键。即 Fanout 交换机会将消息发送到所有与其绑定的队列,不管队列的绑定键是什么。
    • Fanout 交换机适用于广播消息给多个消费者的场景,每个消费者都会收到相同的消息。
  1. Topic 交换机
    • Topic 交换机根据消息的路由键和队列的绑定键之间的模式匹配规则,将消息路由到一个或多个队列中。Topic 交换机支持通配符匹配,可以根据路由键的模式进行灵活的匹配。
    • Topic 交换机适用于灵活的消息路由场景,可以根据消息的内容进行多种模式匹配,实现更精细的消息路由控制。
    • * (星号) 用来表示一个单词 (必须出现的)
      # (井号) 用来表示任意数量(零个或多个)单词

配置消息转换器

原因:

引入jackson依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.5</version>
</dependency>

当映入jackson依赖后其spring的ampq中将会自动使用

在生产者和消费者是启动类上注入Bean

消费者端使用注解方式声明交换机以及队列

//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue"),
        exchange = @Exchange(name = "xiaomi.topic",type = ExchangeTypes.TOPIC),
        key = "test.#"
))

生产者确认机制

开启生产者确认机制后,MQ的性能下降明显,一般场景不建议开启。

保证消息可靠性

LazyQueue

即将消息持久化直接写入磁盘,而不是从放在内存中。

消费者确认机制

一般情况下为了保证消息不被丢失都是:nack

失败重试机制

当消费者出现异常后,为了避免消息不断重新入队后又重新发送,造成死循环导致性能压力飙升,所以又消息失败重试机制。

当重试耗尽后的处理方式

创建失败处理的Config

/** MQ的配置
 * @author 12547
 * @version 1.0
 * @Date 2024/3/17 16:28
 */
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true")
public class ErrorConfig {



    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

死信交换机

死信交换机如以上案例所示,用作于某些无法处理的消息的兜底方案,还可以用做延迟消息(思路上可以,但不推荐)

延迟消息

官方延迟插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

使用docker logs rabbit 查看MQ版本

找到对应版本并下载

进入 cd /root/目录下

将下载好的插件上传至该目录下

docker cp /root/rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins

执行后进入docker容器 cd plugins 查看是否安装成功

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启容器后查看管理页面在选项栏中看到则代表成功

SpringBoot整合延时消息

在注解中的@Exchange中额外加入delayed="true";

发送消息

 生产者测试代码:

//测试延时消息
    @Test
    public void testSendDelayMapMsg(){
        //队列名称

        //消息
         String msg="hello,delay";

        rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //添加延时消息属性
                message.getMessageProperties().setDelay(5000);  //设置延时时间5s
                return message;
            }
        });
        System.out.println("消息发送成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }

}

消费者测试代码:

//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenerDelay(String msg){
    System.out.println("消息接受成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    System.out.println("收到延时消息:"+msg);
}

测试延时结果

触发失败消息策略

在消费者端期初由于LocalDateTime类使用错误(用了LocalTime),而Formate类是DateTime,导致无法转换,而生产者端会直接报错。生产者端改正后忘记修改消费者端的这部分代码(复制过去的),生产者测试代码能够正常运行,而消费者端这部分发现报错,但IDEA不会出现报错提示,此处应该是我的代码出现异常,所以导致消费者无法正常消费,走异常处理后进入失败重试,重试结束后走消息失败策略,进入定义的error交换机与队列。

异常后进入error.queue

延时消息经典场景

支付超时取消订单

有关延时消息最常用的一个业务就是支付超时取消,即防止用户下订后一直不支付,通过延时消息来达到取消订单的功能。

以订单30分钟超时时间为例,如果用户创建订单后进入消息队列直接设置30分钟延时消息后,那么这个消息要30分钟后才会给消费者判断,而这30分钟则一直堆积在队列中,但绝大多数情况用户支付基本都会在1分钟内完成,只有极少数情况才会出现超时,所以方案就是将30分钟的超时时间拆分为一个一个分段的延时时间1s,5s,10s,30s......这样子就避免了消息堆积,浪费性能

自定义一个延时消息体类 用于封账需要发送的消息和延迟时间

/**延迟消息实体类
 * @author 12547
 * @version 1.0
 * @Date 2024/3/17 22:02
 */
@Data
public class MultiDelayMessage<T> {

    //消息体
    private T data;
    //延时消息数组
    private List<Long> delayMils;

    public MultiDelayMessage(T data,List<Long> delayMils){
        this.data=data;
        this.delayMils=delayMils;
    }


    public static <T> MultiDelayMessage<T> of(T data, Long ...delayMils){
        return new MultiDelayMessage<>(data, CollUtil.newArrayList(delayMils));
    }

    //获取延迟队列的下一个延迟时间并返回
    public Long removeNextDelay(){
        return delayMils.remove(0);   //从延迟时间列表中移除并返回下一个延迟时间
    }

}

 

为了避免发送延迟消息时都要创建一个新消息体对象,所以手动创建一个类

自定义一个DelayPostMessage实现接口MessagePostProcessor的postProcessMessage方法,然后在方法中创建一个delay的成员变量,即通过构造函数的方式传入delay。@RequiredArgsConstructor 注解可以用于类上,用于自动生成一个包含所有被 final 修饰的成员变量

自定义DelayPostMessage:

@RequiredArgsConstructor
public class DelayPostMessage implements MessagePostProcessor {

    private final int delay;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }
}

那么使用就改为了

rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new DelayPostMessage(msg.removeNextDelay().intValue()));-


http://www.kler.cn/a/441448.html

相关文章:

  • K8S 集群搭建和访问 Kubernetes 仪表板(Dashboard)
  • “腾讯、钉钉、飞书” 会议开源平替,免费功能强大
  • mysql直接在sql中将分组查询出来的多个属性的list,拼接成一个字符串,最后的结果只要一个大的字符串
  • 亚马逊新店铺流量怎么提升?自养号测评新趋势
  • 如何理解json和json字符串
  • MySQL训练营-慢查询诊断问题
  • linux上qt打包(二)
  • Windows环境 (Ubuntu 24.04.1 LTS ) 国内镜像,用apt-get命令安装RabbitMQ,java代码样例
  • Windows server服务器之网络安全管理(防火墙入站规则创建)
  • C# 23种设计模式(4)访问者模式(Visitor Pattern)
  • @pytest.fixture() 跟 @pytest.fixture有区别吗?
  • 机器学习实战31-基于机器学习算法对某年福州市各初中重点高中录取率进行数学分析,评估性价比较高的学校。
  • 探索 PIE 在 ESP32-P4 上的应用
  • 找出一个数组中出现次数最多的那个元素。:哈希表:JAVA
  • SQL, 将分段数不确定的字符串拆分成多列
  • Android之RecyclerView显示数据列表和网格
  • 2024-12-16 装有Ubuntu系统的移动硬盘使用windows系统对其进行格式化
  • SpringDataJpa-字段加解密存储
  • webpack打包流程及原理
  • LeetCode 283.移动零(超简单讲解)
  • 鸿蒙权限请求工具类
  • 力扣-图论-15【算法学习day.65】
  • 【PyTorch】实现在训练过程中自定义动态调整学习率
  • 测试工程师八股文04|计算机网络 和 其他
  • 【日常笔记】基本数据类型浅析 -int类型能存储哪些传感器数据
  • 减少 Flutter 应用体积的常用方法