当前位置: 首页 > article >正文

【谷粒商城之消息队列RabbitMQ】

本笔记内容为尚硅谷谷粒商城消息队列RabbitMQ部分

目录

一、概述

二、简介

三、Docker安装RabbitMQ

四、Springboot整合RabbitMQ 

1、引入spring-boot-starter-amqp

2、application.yml配置

3、测试RabbitMQ

1. AmqpAdmin-管理组件

2.RabbitTemplate-消息发送处理组件

3.RabbitListener&RabbitHandle接收消息

4.RabbitMQ消息确认机制-可靠抵达

5.可靠抵达-Ack消息确认机制


一、概述


二、简介


三、Docker安装RabbitMQ


docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口) 

Networking and RabbitMQ — RabbitMQ

四、Springboot整合RabbitMQ 


1、引入spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml配置

spring.rabbitmq.host=192.168.88.130
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3、测试RabbitMQ

1. AmqpAdmin-管理组件

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 1、如何创建Exchange[hello.java.exchange]、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     */
    // 创建交换机
    @Test
    void createExchange() {
        //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        //durable:是否持久化
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("exchange:[{}]创建成功","hello-java-exchange");
    }

    // 创建队列
    @Test
    void createQueue() {
        //exclusive:是否排他的,false:允许同时有多个连接到此queue
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("queue:[{}]创建成功","hello-java-queue");
    }

    @Test
    void createBinding(){
        // 创建绑定
        // String destination【目的地】,
        // DestinationType destinationType 【目的地类型】
        // String exchange【交换机】,
        // String routingKey【路由键】,
        // Map<String, Object> arguments【自定义参数】
        // 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("binding:[{}]创建成功","hello-java-binding");
    }

2.RabbitTemplate-消息发送处理组件

    @Test
    void sendMessageTests() {

        //1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
        String msg = "Hello World";
        OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
        entity.setId(1L);
        entity.setCreateTime(new Date());

        //2、发送的对象类型的消息,可以是一个json
        for (int i = 0;i<10;i++) {
            if(i%2==0) {
                entity.setName("Vc" + i);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
            }else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
            }
            log.info("消息发送完成{}" + entity);
        }
    }

使用json格式的序列化器

    //使用json格式的序列化器
    //否则使用jdk的序列化器
    @Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

3.RabbitListener&RabbitHandle接收消息

    1)必须使用@EnableRabbit
    2)监听方法必须放在@Component中
    3)@RabbitListener(queues={"hello-java-queue"})放在类上
         @RabbitHandler:标在方法上【作用:重载处理不同类型的数据】
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService 
    /**
     * queue:声明需要监听的所有队列
     *
     * org.springframework.amqp.core.Message
     *
     * 参考可以写一下类型
     * 1、Message message:原生消息详细信息。头+体
     * 2、T<发送的消息的类型> OrderReturnReasonEntity content
     * 3、Channel channel: 当前传输数据的通道
     *
     * Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
     * 场景:
     *      1)、订单服务启动多个;同一个消息,只能有一个客户端收到
     *      2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
     * @param message
     */
    //RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
    }

    @RabbitHandler
    public void recieveMessage2(OrderEntity content) {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
    }

}

4.RabbitMQ消息确认机制-可靠抵达

注意:springboot.rabbitmq.publisher-confirm 已被弃用.

#新版使用
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated

#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个return
spring.rabbitmq.template.mandatory=true

示例代码: 

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

 

5.可靠抵达-Ack消息确认机制

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

示例代码: 

    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
        //Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        //签收货物,非批量模式
        try {

            if(deliveryTag%2==0){
                //收货
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..."+deliveryTag);
            }else {
                //退货 requeue=false 丢弃 requeue=true 发回服务器,服务器重新入队
                //deliveryTag, multiple, requeue(false不再入队)
                channel.basicNack(deliveryTag,false,false);
                System.out.println("没有签收了货物..."+deliveryTag);
            }
        } catch (Exception e) {
            //网络中断

        }
    }

结束!


http://www.kler.cn/a/17024.html

相关文章:

  • 力扣662:二叉树的最大宽度
  • 密码学的基本原理
  • Jmeter性能测试 -3数据驱动实战
  • 论文阅读《BEVFormer v2》
  • Springboot 日志处理(非常详细)
  • 事件循环 -- 资源总结(浏览器进程模型、事件循环机制、练习题)
  • nullptr和NULL的区别
  • Photoshop如何使用选区之实例演示?
  • Java中顺序表详解
  • 自动驾驶行业观察之2023上海车展-----智驾供应链(1)
  • E. Train Hard, Win Easy(数学推导 + 前缀和)
  • JAVA-实现简易图书管理系统
  • leetcode 面试题 02.04. 分割链表
  • YonLinker连接集成平台构建新一代产业互联根基
  • babysql
  • JavaWeb学习------Servlet
  • Java基本数据类型以及包装类型的常量池技术
  • Java中线程池的介绍、构造方法及优势
  • 电子工程师是怎么练成的
  • 数据结构与算法之链表: Leetcode 141. 环形链表 (Typescript版)
  • 谷粒商城二十四Sentinel限流熔断降级
  • 用于scATAC-seq有监督分类的Cellcano
  • 【LeetCode刷题记录】数组专题
  • Python小姿势 - Python面向对象
  • 《基于深度学习模型的非接触式面部视频记录技术用于心房颤动的检测》阅读笔记
  • 「Codeforces」B. Avoid Local Maximums