当前位置: 首页 > article >正文

RabbitMQ最全教程-Part2(高阶使用)

我们先来看一下这个网约车的场景

 我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数

 在这幅图中,我们今天来解决一下其中的三个关键问题:打车超时,打车排队,消息推送

一、延时任务

首先什么时候需要做延时任务?

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信
  • 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

 就像刚才的打车超时问题,如果再最大限度时间内还没有打到车,那么我们需要做一些操作,这个整个流程我们就可以称为延时任务

那如何解决延时任务呢?

最原始的方案可以对数据库进行轮询操作,当查询到某些任务的延时时间到了就触发执行

除此之外,我们可以用延时队列实现

1、延时队列

 JDK的延迟队列

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

 由于JDK自带的延时队列是基于内存的,所以其执行效率非常高,延迟小

缺点:

服务器重启后,数据全部消失,怕宕机
集群扩展相当麻烦
因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
代码复杂度较高

时间轮算法

 这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)

缺点参考JDK延迟队列

使用RabbitMQ实现延时任务

 RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。

这个方案称为死信队列

2、死信队列

什么是死信队列?

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解

一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列

所以死信队列就是存放死信消息的队列

死信交换机

死信如何被投放到死信队列,那肯定是要先交到一个死信交换机的手中。

那死信交换机只接收哪类消息呢?

  • 消费者对消息使用了basicReject或者basicNack回复,并且requeue参数设置为false,即不再将该消息重新在消费者间进行投递
  • 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置TTL属性
  • 队列中的消息已经超过其设置的最大消息个数

死信队列的实现

在 Spring Boot 配置类中定义死信交换器、普通交换器、普通队列和死信队列,并进行绑定。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 死信交换器名称
    public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";
    // 死信队列名称
    public static final String DEAD_LETTER_QUEUE = "dlx.queue";
    // 死信路由键
    public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";

    // 普通交换器名称
    public static final String NORMAL_EXCHANGE = "normal.exchange";
    // 普通队列名称
    public static final String NORMAL_QUEUE = "normal.queue";
    // 普通路由键
    public static final String NORMAL_ROUTING_KEY = "normal.routing.key";

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE, true); // durable: true
    }

    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
    }

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    @Bean
    public Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        args.put("x-message-ttl", 10000); // 消息存活时间,单位毫秒
        return new Queue(NORMAL_QUEUE, true, false, false, args); // durable: true, exclusive: false, autoDelete: false, arguments: args
    }

    @Bean
    public Binding normalBinding() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);
    }
}

创建一个生产者类,用于发送消息到普通队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, message);
        System.out.println("Sent message: " + message);
    }
}

创建一个消费者类,用于消费普通队列中的消息。如果消息处理失败或达到最大重试次数,消息将被路由到死信队列。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

    @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)
    public void receiveMessage(String message) {
        try {
            // 模拟消息处理失败
            if (message.equals("fail")) {
                throw new RuntimeException("Message processing failed");
            }
            System.out.println("Received message: " + message);
        } catch (Exception e) {
            System.out.println("Failed to process message: " + message);
            throw e; // 重新抛出异常,使消息进入死信队列
        }
    }
}

基于RabbitMQ的死信队列实现延时任务

其关键点就是对于这个正常队列不要添加消费者,TTL一到,自动投递到死信队列,接下来再对死信队列中消息进行消费,实现一个延时任务的执行

二、消息可靠性保证

面试题:MQ如何保证消息不丢失?

这是一个很宽泛的问题,而不仅仅回答一个同步?异步?ACK?

我们要从生产端怎么保证消息可靠性的、broker怎么保证的、消费端怎么保证的?

生产者保证消息可靠性

1、失败通知

当生产者发送消息时,消息先到交换机。这时,交换机有两种情况,一是能正常到达queue,我们称为可路由,反过来,主要无法传到某个queue,那么就代表不可路由,这时候表示投递失败,会有一个失败通知反馈给生产者

在代码层面,需要在生产者添加监听器

yml配置

spring:
  rabbitmq:
    # 消息在未被队列收到的情况下返回
    publisher-returns: true
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.api.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 设置 Return 回调
        rabbitTemplate.setReturnCallback((returnedMessage) -> {
            System.out.println("Message returned: " + new String(returnedMessage.getBody()) +
                              ", replyCode: " + returnedMessage.getReplyCode() +
                              ", replyText: " + returnedMessage.getReplyText() +
                              ", exchange: " + returnedMessage.getExchange() +
                              ", routingKey: " + returnedMessage.getRoutingKey());
        });

        return rabbitTemplate;
    }

}

2、发送方确认

在上面失败通知时,是发送在无法找到queue,这时除了失败通知,其实还会返回Nack;

 当可路由时,那就开始投放到真正的queue,但是这个投放一定会放入成功吗?肯定会有失败(比如:队列已满,不可再投放)

怎么样在代码层面开启发送方确认ack呢?

spring:
  rabbitmq:    
    # 开启消息确认机制
    publisher-confirm-type: correlated
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 设置 发送方确认 回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message sent successfully: " + correlationData.getId());
            } else {
                System.out.println("Message sending failed: " + correlationData.getId() + ", cause: " + cause);
            }
        });

        // 设置 失败通知 回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("Message returned: " + message + 
                              ", replyCode: " + replyCode + 
                              ", replyText: " + replyText + 
                              ", exchange: " + exchange + 
                              ", routingKey: " + routingKey);
        });

        return rabbitTemplate;
    }
}

Broker保证消息可靠性

假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。

解决办法就是开启RabbitMQ的持久化机制

在 Spring Boot 配置类中定义交换器、队列和绑定关系,并设置队列的持久化属性。

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "my.exchange";
    public static final String QUEUE_NAME = "my.queue";
    public static final String ROUTING_KEY = "my.routing.key";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME, true, false); // durable: true, autoDelete: false
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); // durable: true
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);
    }
}

生产者:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(TaxiBO taxiBO) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, taxiBO, m -> {
            m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
            return m;
        }, new CorrelationData(UUID.randomUUID().toString()));
        System.out.println("Sent message: " + taxiBO.getAccountId());
    }
}

消费者保证消息可靠性

消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时MQ认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。

该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import java.io.IOException;

@Component
public class MessageConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, acknowledgeMode = "manual")
    public void receiveMessage(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        try {
            TaxiBO taxiBO = rabbitTemplate.getMessageConverter().fromMessage(message, TaxiBO.class);
            if (taxiBO != null) {
                System.out.println("Received message: " + taxiBO.getAccountId());
                // 处理消息的业务逻辑
                processMessage(taxiBO);
                // 手动确认消息
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 处理失败,可以选择重新入队或丢弃消息
            channel.basicNack(deliveryTag, false, true);
        }
    }

    private void processMessage(TaxiBO taxiBO) {
        // 模拟业务逻辑处理
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Processed message: " + taxiBO.getAccountId());
    }
}


http://www.kler.cn/a/377264.html

相关文章:

  • JS中若干相似特性的区别
  • iClient3D for Cesium在Vue中快速实现场景卷帘
  • 【零基础保姆级教程】制作自己的数据集(二)——Labelme的安装与使用及常见的报错解决方法
  • 动态规划<四> 回文串问题(含对应LeetcodeOJ题)
  • React 前端框架简介
  • ajax中get和post的区别,datatype返回的数据类型有哪些?web开发中数据提交的几种方式,有什么区别。
  • 【Linux系列】Linux 系统中的软连接管理
  • 科学教育与少儿编程:同向同行,共育新时代科技人才
  • RabbitMQ的解耦、异步、削峰是什么?
  • java医院绩效管理系统源码,采用B/S架构,开发工具:maven、Visual Studio Code,医院绩效管理系统数据流程解析
  • 使用 Docker Compose 将数据版 LobeChat 服务端部署
  • 无人机高山景区物资吊运技术及前景分析
  • 【OJ题解】C++实现反转字符串中的每个单词
  • 拿不到kafka消息可能是什么原因?
  • 图像压缩——图像压缩的保真度准则与压缩性能参数
  • 【那些年踩过的坑-前端篇- Mac版本】npm init vite 失败,报错`CERT_HAS_EXPIRED npm ERR
  • Python自动化操作Word文档详解
  • 在VB.NET中,Try...Catch...Finally 和On Error Resume Next有什么区别
  • K8S自建企业私有云方案 单台起配 NVMe全闪存储性能
  • Maven引入记录
  • NPOI 操作详解(操作Excel)
  • 游戏开发与游戏运营:哪个更难?
  • android——渐变色
  • Get包中的依赖管理介绍
  • 【图解版】力扣第70题:爬楼梯
  • 《HelloGitHub》第 103 期