当前位置: 首页 > article >正文

RabbitMQ工作模式2 整合springboot 和MQ高级特性

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {
    @Test
    public void bbb() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码");
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mydirect1",false,false,false,null);
        channel.queueDeclare("mydirect2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mydirect1","ex_direct","error");
        channel.queueBind("mydirect2","ex_direct","test");
        channel.queueBind("mydirect2","ex_direct","test2");
        //交换机     routingkey     根据routingkey在队列上发布消息
        channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());
    }
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{
    public static void main( String[] args ) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号"); 
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号); 
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("mq:-----aaa"+s);
            }
        };
        channel.basicConsume("mydirect1",true,consumer);
    }
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {
    @Test
    public void ccc() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mytopics1",false,false,false,null);
        channel.queueDeclare("mytopics2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mytopics1","ex_topics","test.#");
        channel.queueBind("mytopics2","ex_topics","*.aaa");
        channel.queueBind("mytopics2","ex_topics","test.*");
        //交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息
        channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());


    }
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicMqConfig {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name}")
    private String QUEUENAME1;
    @Value("${mq.queue.name}")
    private String QUEUENAME2;
    //创建交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //创建队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue1;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue2;
    }
    //绑定交换机和队列
    @Bean("binding1")
    public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();
        return binding2;
    }
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {
    @RabbitListener(queues = "test_queue2")
    public void xxx(Message message){
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
    }
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b){
                    System.out.println("发送消息成功");
                }else {
                    System.out.println("发送消息失败,原因:"+s);
                }
            }
        });
        rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
    }
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){
    //  消息回退
    rabbitTemplate.setMandatory(true);//
    rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));
    rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "test_queue2")
    public void onMessage(Message message, Channel channel) throws Exception {
      Thread.sleep(2000);
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(1/0);
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            System.out.println("拒绝签收");
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

启动测试

有异常拒绝签收

无异常签收成功


http://www.kler.cn/a/148529.html

相关文章:

  • 01-Ajax入门与axios使用、URL知识
  • 灰狼优化算法
  • 实现一个BLE HID鼠标
  • 如何在Python中实现一个简单的搜索引擎:从零开始的指南
  • 低代码集成多方API的简单实现
  • 信号量和线程池
  • datasets.Dataset.map方法学习笔记
  • vscode在Windows上安装插件提示错误xhr failed
  • 编程语言发展史:Ruby语言的发展和应用
  • Docker 镜像使用
  • sqlserver写入中文乱码问题
  • Java中的mysql——面试题+答案(数据库设计)——第25期
  • 机器学习的复习笔记2-回归
  • 如何获取高质量的静态住宅IP代理?常见误区与注意事项
  • C语言--每日选择题--Day28
  • 开关电源低温启动测试条件是什么?如何测试开关电源?
  • Micropython STM32F4外部中断实验
  • 【闲读 1】量子论引出对认知的思考
  • docker compose搭建渗透测试vulstudy靶场示例
  • 11.28 知识回顾(Web框架、路由控制、视图层)
  • java基础-IO
  • Jquery动画特效
  • vue项目门店官网页面, 根据视口大小自动跳转页面逻辑(pc --> mobile / mobile -->pc)
  • 【算法】七大经典排序(插入,选择,冒泡,希尔,堆,快速,归并)(含可视化算法动图,清晰易懂,零基础入门)
  • MongoDB安装教程
  • 51单片机制作数字频率计