当前位置: 首页 > article >正文

互联网全景消息(8)之RabbitMQ进阶介绍

一、RabbitMQ架构

        可以看出RabbitMQ主要分为三个角色:

  • Pulisher:消息的发布者,将消息 发布到RabbitMQ中Exchange;
  • RabbitMQ服务:Exchange接收Publisher的消息,并且根据Routes策略将消息转发到Queue中;
  • Consumer:消息的消费者,监听Queue中的消息并进行消费;

        官方提供的架构图相对简洁,我们可以自己画一份相对完整一些的架构图:

        可以看得出来Publisher和Consumer都是单独和RabbitMQ服务某一个Virtual Host建立Connection的客户端,后续通过Connection可以构建多个channel,用来发布、接收消息,一个Virtual Host中可以有多个Exchange和Queue,Exchange可以同时绑定多个Queue。

        基于以上架构图再结合图形管理界面会更加清晰:

二、RabbitMQ通讯方式 

         RabbitMQ提供了很多中通讯方式,依然可以去官方查看:https://rabbitmq.com/getstarted.html

2.1 RabbitMQ提供的通讯方式

  • Hello World!:为了入门操作!

  • Work queues:一个队列被多个消费者消费

  • Publish/Subscribe:手动创建Exchange(FANOUT)

  • Routing:手动创建Exchange(DIRECT)

  • Topics:手动创建Exchange(TOPIC)

  • RPC:RPC方式

  • Publisher Confirms:保证消息可靠性

       接下来我们通过代码的方式来分别演示一下,以上场景的使用方式。

2.2 构建获取RabbitMQ的Connection工具类

         导入依赖:amqp-client,junit

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

       构建工具类:

package com.mashibing.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description
 */
public class RabbitMQConnectionUtil {

    public static final String RABBITMQ_HOST = "192.168.11.32";

    public static final int RABBITMQ_PORT = 5672;

    public static final String RABBITMQ_USERNAME = "guest";

    public static final String RABBITMQ_PASSWORD = "guest";

    public static final String RABBITMQ_VIRTUAL_HOST = "/";

    /**
     * 构建RabbitMQ的连接对象
     * @return
     */
    public static Connection getConnection() throws Exception {
        //1. 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置RabbitMQ的连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

        //3. 返回连接对象
        Connection connection = factory.newConnection();
        return connection;
    }

}

2.3 hello world

        生产者:

package com.mashibing.helloworld;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @description
 * @date 2022/1/24 22:54
 */
public class Publisher {

    public static final String QUEUE_NAME = "hello";

    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //4. 发布消息
        String message = "Hello World!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功!");
    }
}

        消费者:

package com.mashibing.helloworld;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * @description
 * @date 2022/1/24 23:02
 */
public class Consumer {

    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }
}

2.4 Work Queues

  • 生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。

  • 消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息

package com.mashibing.workqueues;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * @description
 * @date 2022/1/25 19:52
 */
public class Consumer {

    @Test
    public void consume1() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        //3.5 设置消息的流控
        channel.basicQos(3);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }

    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        channel.basicQos(3);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }
}

2.5  Publish/Subscribe

        对于Exchange 的类型,大家可以参考上一篇博客的内容。

        生产者:自行构建Exchange并绑定指定队列(FANOUT类型)

package com.mashibing.pubsub;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @description
 * @date 2022/1/25 20:08
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "pubsub";
    public static final String QUEUE_NAME1 = "pubsub-one";
    public static final String QUEUE_NAME2 = "pubsub-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());
        System.out.println("消息成功发送!");
    }
}

2.6 Routing

        生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue中。

 

package com.mashibing.routing;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @description
 * @date 2024/12/25 20:20
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "routing";
    public static final String QUEUE_NAME1 = "routing-one";
    public static final String QUEUE_NAME2 = "routing-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());
        System.out.println("消息成功发送!");


    }

}

2.7 Topic

        生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式。 

package com.mashibing.topics;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @description
 * @date 2022/1/25 20:28
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "topic";
    public static final String QUEUE_NAME1 = "topic-one";
    public static final String QUEUE_NAME2 = "topic-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,
        // TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
        // 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());
        System.out.println("消息成功发送!");

    }
}

 三、SpringBoot操作RabbitMQ

3.1 SpringBoot声明信息

        导入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

        配置RabbitMQ信息

spring:
  rabbitmq:
    host: 192.168.11.32
    port: 5672
    username: guest
    password: guest
    virtual-host: /

        声明交互机与队列:

package com.mashibing.rabbitmqboot.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description
 * @date 2024/12/8 20:25
 */
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE = "boot-exchange";
    public static final String QUEUE = "boot-queue";
    public static final String ROUTING_KEY = "*.black.*";

@Bean
  public Exchange bootExchange(){
      // channel.DeclareExchange
      return ExchangeBuilder.topicExchange(EXCHANGE).build();
  }

  @Bean
  public Queue bootQueue(){
      return QueueBuilder.durable(QUEUE).build();
  }

  @Bean
  public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
      return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
  }
}

3.2 生产者

        通过引入RabbitMQTemplate来发送消息。

package com.mashibing.rabbitmqboot;

import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    }


    @Test
    public void publishWithProps(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setCorrelationId("123");
                return message;
            }
        });
        System.out.println("消息发送成功");
    }
}

3.3 消费者

package com.mashibing.rabbitmqboot;

import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class ConsumeListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列的消息为:" + msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:" + correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

四、RabbitMQ保证消息的可靠性

        上一章节中我们也大体介绍了如果保证RabbitMQ消息的可靠性,在这一章节中我们主要结合代码来说明这一点。

        总体来说需要保证RabbitMQ消息的可靠性,主要是:

  1. 保障消息一定送达到Exchange;
  2. 保障消息可以路由到Queue;
  3. 保障队列可以持久化消息;
  4. 保障消费者可以正常消费消息;

4.1 如果保障消息可以到达Exchange

        Confirm机制:可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果。

//4. 开启confirms
channel.confirmSelect();

//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息成功的发送到Exchange!");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
    }
});

4.2 保证消息可以路由到 Queue 

        Return机制:为了保证Exchange一定可以送达到Queue。

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
    }
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调

4.3 保证Queue可以持久化消息

        DeliveryMode设置消息持久化, DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
    .builder()
    .deliveryMode(2)
    .build();

//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());

4.4 保证消费者可以正常消费消息

        请查看上面的work queue模式。

4.5 Spring Boot实现RabbitMQ的消息可靠性

4.5.1Confirm

        编写配置文件开启Confirm机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 新版本
    publisher-confirms: true  # 老版本 

        在发送消息时,配置RabbitMqTemplate

@Test
public void publishWithConfirms() throws IOException {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("消息已经送达到交换机!!");
            }else{
                System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");
            }
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

4.5.2 Return机制

        编写配置文件开启Return机制。

spring:
  rabbitmq:
    publisher-returns: true # 开启Return机制

        在发送消息时,配置RabbitMQTemplate

@Test
public void publishWithReturn() throws IOException {
    // 新版本用 setReturnsCallback ,老版本用setReturnCallback
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            String msg = new String(returned.getMessage().getBody());
            System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

4.5.3 消息持久化      

@Test
public void publishWithBasicProperties() throws IOException {
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息的持久化!
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }
    });
    System.out.println("消息发送成功");
}

五、RabbitMQ死信队列&延迟交换机

5.1 什么是死信队列

         死信队列的应用:

  • 基于死信队列在队列消息已满的情况下,消息也不会丢失;
  • 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间。

5.2 实现死信队列

5.2.1  准备Exchange&Queue

@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE = "normal-exchange";
    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE = "dead-exchange";
    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";


    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
    }

    @Bean
    public Binding normalBinding(Queue normalQueue,Exchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }


    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}

 5.2.2 实现效果

         基于消费进行reject或者nack实现死信效果

import com.mashibing.rabbitmqboot.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到normal队列的消息:" + msg);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

        基于消息的生存时间:1、给消息设置生存时间。

@Test
public void publishExpire(){
    String msg = "dead letter expire";
    rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    });
}

        给队列设置生存时间。

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(NORMAL_QUEUE)
            .deadLetterExchange(DEAD_EXCHANGE)
            .deadLetterRoutingKey("dead.abc")
            .ttl(10000)
            .build();
}

        基于设置Queue中的消息最大长度。

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(NORMAL_QUEUE)
            .deadLetterExchange(DEAD_EXCHANGE)
            .deadLetterRoutingKey("dead.abc")
            .maxLength(1)
            .build();
}

5.3 延迟交换机

         下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9 ;

        死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

  • 构建延迟交换机
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author zjw
 * @description
 */
@Configuration
public class DelayedConfig {

    public static final String DELAYED_EXCHANGE = "delayed-exchange";
    public static final String DELAYED_QUEUE = "delayed-queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.#";

    @Bean
    public Exchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","topic");
        Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
        return exchange;
    }

    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DELAYED_QUEUE).build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}
  • 发送消息;

 

import com.mashibing.rabbitmqboot.config.DelayedConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DelayedPublisherTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });
    }
}

六、RabbitMQ集群

        RabbitMQ的镜像模式。

6.1 搭建RabbitMQ集群

  • 准备两台虚拟机(克隆)

  • 准备RabbitMQ的yml文件

        分别编写RabbitMQ的docker-compose文件。

version: '3.1'
services:
  rabbitmq1:
    image: rabbitmq:3.8.5-management-alpine
    container_name: rabbitmq1
    hostname: rabbitmq1
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369
      - 25672:25672

        rabbitmq2:

version: '3.1'
services:
  rabbitmq2:
    image: rabbitmq:3.8.5-management-alpine
    container_name: rabbitmq2
    hostname: rabbitmq2
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369
      - 25672:25672

         准备完毕之后,启动两台RabbitMQ。

         让RabbitMQ服务实现join操作,需要再rabbitmq2执行4个命令,让rabbitmq2加入到rabbitmq1中。

rabbitmqctl stop_app
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app

         执行成功后:

         镜像队列通过在RabbitMQ集群中的多个节点上复制队列的内容,确保即使某个节点发生故障,队列及其消息仍然可以从其他节点恢复。这种机制基于主从复制模式,其中一个节点作为领导者(Leader,或主节点Master),负责处理所有的写入操作和部分读取操作,而其余的节点作为跟随者(Followers,或副节点Slave),从领导者那里同步消息。

        设置镜像模式:在指定的RabbitMQ服务中设置好镜像策略即可。 


http://www.kler.cn/a/459524.html

相关文章:

  • 设计模式 创建型 建造者模式(Builder Pattern)与 常见技术框架应用 解析
  • flink-connector-kafka 3.4源码编译
  • pip安装paddle失败
  • 基于微信小程序的校园点餐平台的设计与实现(源码+SQL+LW+部署讲解)
  • 如何在 API 设计中做到接口幂等
  • SAKO搜索帮助增强(FB02科目搜索帮助)
  • 【机器学习】概述
  • 【C++11】类型分类、引用折叠、完美转发
  • 【数据库初阶】Linux中表的基础操作
  • 【Redis】集群配置(主从复制 哨兵搭建)
  • JPA查询部分字段的最佳实践
  • Python 中的 with open:文件操作的最佳实践
  • 发布远程组件vue2+Webpack和vue3+vite
  • 面试场景题系列:设计云盘系统
  • jmeter设置tps、响应时间监测时间间隔
  • DigitalOcean Kubernetes现已支持VPC natvie集群
  • 【深度学习】Pytorch框架介绍
  • 基于单片机的温湿度采集系统(论文+源码)
  • 整车厂如何规划构建汽车集成安全团队的软件研发能力
  • win10 indy加载ssl 出错could not load ssl library
  • k8s七层代理Ingress(基础知识)
  • C 语言: scanf 函数详解
  • web3基于zkEVM的L2扩容方案-Scroll
  • 华为OD机试真题---服务器广播
  • 活动预告 | Microsoft Azure 在线技术公开课:使用 Azure OpenAI 服务构建生成式应用
  • webrtc音频模块(五) AudioState和AudioDeviceBuffer