当前位置: 首页 > article >正文

掌握RabbitMQ:全面知识点汇总与实践指南

前言

RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。

特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。

作用:服务间异步通信;顺序消费;定时任务;请求削峰;

1、AMQP协议定义

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输

特性AMQPMQTT
适用场景大型企业级应用、金融交易、云服务物联网、移动应用、智能家居
通信模式生产者-消费者发布-订阅
消息大小较大,适合复杂的消息结构小型,适合简单的消息
QoS 级别支持,但不如 MQTT 精细详细的 QoS 级别,特别是针对 IoT 场景
性能要求对性能有一定要求,但更注重可靠性和安全性极低的带宽消耗和资源占用
安全性强调端到端的安全性支持基本的安全特性,适用于资源受限环境

2、AMQP机制

1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。

  • 生产者发送消息到Broker消息代理服务
  • 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
  • 队列存储消息,直到消费者取走消息
  • 消费者,读取队列中的消息

AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。

类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。

2>AMQP消息传递方式

特性点对点模式 (P2P)发布/订阅模式 (Pub/Sub)
消息传递方式每条消息仅被一个消费者处理一条消息可以被多个消费者同时接收
队列数量单个队列每个消费者有自己的队列
生产者行为直接发送到队列发送到交换器,由交换器负责路由
消费者行为从同一队列中竞争消费各自独立消费自己的队列中的消息
适用场景任务分配、工作流管理广播通知、日志记录、事件驱动架构
扩展性受限于单个队列的吞吐量可以通过增加更多的消费者来提高整体吞吐量
复杂度较低,易于理解和实现需要考虑交换器类型、路由规则等因素,稍微复杂

在这里插入图片描述

  • 1、点对点
    生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
    每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。

竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。

// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());

// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 执行任务...
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});

  • 2、发布订阅
    生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本

(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息

// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());

// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

3、AMQP消息只被消费一次

  • 1、合理配置消息队列ACK机制
    大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
  • 2、合理配置消息队列预取数量
    防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
  • 3、消费者幂等性设计
    针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
    确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
    增加补偿机制,比如退款,退积分等概念的操作
  • 4、分布式锁
    借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。
  • 5、监控告警机制
    监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。
  • 6、事务性消息
    指的是消息和业务操作,一起成功或一起失败的机制。
    (1)本地事务+补偿机制
    (2)二阶段提交
    引入协调者和参与者的概念。
    客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
    (3)三阶段提交
    针对二阶段提交完善事务性消息机制。
    首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。

4、AMQP 消息顺序消费

  • 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
  • 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:channel.basicQos(1);
  • 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
  • 另外使用幂等性设计来避免重复消费。
  • 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
  • 增加监控告警到服务负责人。
  • 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。

5、AMQP消息可靠性

  • 事务支持
    允许一组操作作为一个整体提交或回滚。
  • ACK确认机制
    当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。
  • 持久化选项
    可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。

6、RabbitMQ配置ACK

1>rabbitmq.conf或rabbitmq.ini开启配置

# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on

2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class RabbitMQConsumer {
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列,确保它存在
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 设置预取计数为 1,确保每次只处理一条消息
            channel.basicQos(1);

            // 开启手动确认模式
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 模拟任务处理时间
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送 ACK 确认
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };

            // 开始消费消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}

3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认

// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}

// 开启事务模式
channel.txSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    channel.txCommit();
} catch (Exception e) {
    channel.txRollback();
}

7、RabbitMQ配置协议

1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。

# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]

# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/private_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671

# 设置心跳间隔时间为 60 秒
heartbeat = 60

8、RabbitMQ消息持久化

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PersistentExample {
    private final static String QUEUE_NAME = "persistent_queue";
    private final static String EXCHANGE_NAME = "persistent_exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建持久化的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            // 创建持久化的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 绑定队列到交换器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");

            // 发送持久化的消息
            String message = "Persistent message!";
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 2 表示持久化
                    .build();
            channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
  • 1、持久化队列
    channel.queueDeclare("durable_queue", true, false, false, null);
  • 2、交换器持久化
    确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
    channel.exchangeDeclare("durable_exchange", "direct", true);
  • 3、消息持久化
    delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
    channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());

9、RabbitMQ自动重连

网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态

ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

10、RabbitMQ组件

组件名称说明
Producer生产者负责生成并发送消息的应用程序。
Consumer消费者负责接收并处理消息的应用程序。
Message消息承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。
Exchange交换机用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。
Queue队列存储待处理消息的地方,消费者从中拉取消息进行处理。
Binding绑定定义了交换机和队列之间的关系,包括路由键等参数。
Virtual Host虚拟主机类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。

11、RabbitMQ核心组件交换器和路由键

交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。

消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。

消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。

若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。

交换器说明应用场景
Direct精确匹配路由键只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。
Topic基于通配符模式匹配路由键适用于灵活的消息过滤和多条件匹配。
Fanout广播所有消息给所有绑定的队列适用于需要将相同消息发送给多个消费者的场景。
Headers根据消息头属性进行路由适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。

1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。

  • 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");

// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");

// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class DirectConsumer {
    private final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key
            if (argv.length < 1) {
                System.err.println("Usage: DirectConsumer [info] [warning] [error]");
                System.exit(1);
            }
            for (String severity : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列

  • 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");

// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");

// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class FanoutConsumer {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词

  • 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");

// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");

// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class TopicConsumer {
    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key 模式
            if (argv.length < 1) {
                System.err.println("Usage: TopicConsumer [binding_key_pattern]");
                System.exit(1);
            }
            for (String bindingKey : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。

  • 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");

// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);

// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class HeadersConsumer {
    private final static String EXCHANGE_NAME = "headers_exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "headers");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Headers 匹配规则
            Map<String, Object> headers = new HashMap<>();
            headers.put("user_id", "12345");
            headers.put("order_status", "pending");
            AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

12、RabbitMQ核心方法及参数说明

1>newConnection 创建连接工程并开启连接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();

2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据

信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。

Channel channel = connection.createChannel();

3>exchangeDeclare 交换器声明

channel.exchangeDeclare("my_exchange", "direct", true, false, null);

exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。

4>queueDeclare 队列声明

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。

5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则

channel.queueBind(queueName, "my_exchange", "routing_key");

queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则

6>basicPublish 发布消息
向指定的交换器发布一条消息

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .contentType("text/plain")
        .deliveryMode(2) // 2 表示持久化
        .build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());

exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。

7>basicConsume 消费消息
费来自指定队列的消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用

8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费

channel.basicAck(envelope.getDeliveryTag(), false);

9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃

// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);

13、RabbitMQ镜像集群模式

搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。

参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao


http://www.kler.cn/a/466314.html

相关文章:

  • 2025元旦源码免费送
  • PHP Array:精通数组操作
  • 【JAVA】用于控制流程的关键字 break、continue、return 使用场景,注意事项和实例
  • 【shell编程】报错信息:Non-zero Exit Status(包含7种解决方法)
  • 音视频入门基础:MPEG2-PS专题(5)——FFmpeg源码中,解析PS流中的PES流的实现
  • 记一次k8s下容器启动失败,容器无日志问题排查
  • golang 编程规范 - 项目目录结构
  • Gitlab部署maven的方法-适配AGP7.1+
  • GitHub Actions 自动构建和部署容器到 Azure Web App
  • C# 整型、浮点型 数值范围原理分析
  • Vue中常用指令
  • antdesignvue vue3全局loading
  • JavaScript的数据类型及检测方式
  • git时常混淆的操作的笔记
  • 低代码开发深度剖析:JNPF 如何引领变革
  • 数字PWM直流调速系统设计(论文+源码)
  • Docker 环境中搭建 Redis 哨兵模式集群的步骤与问题解决
  • 常见的九种二极管
  • 代码随想录算法训练营第五十二天|KM101.孤岛的总面积|KM102.沉没孤岛|KM103.水流问题|KM104.建造最大岛屿
  • SQLite简介:轻量级数据库入门
  • 57.在 Vue 3 中使用 OpenLayers 点击选择 Feature 设置特定颜色
  • 断舍离:通往心灵自由的五级阶梯
  • JavaScript系列(4)--数值类型专题
  • js获取下拉单选框的值和显示的值
  • springboot整合Quartz实现定时任务
  • 趣味编程:心形曲线