当前位置: 首页 > article >正文

RabbitMQ - 2 ( 21000 字 RabbitMQ 入门级教程 )

一:RabbitMQ 应用

RabbitMQ 提供了 7 种工作模式用于消息传递,而我们入门案例中的程序实际上就是其中的一种简单模式,下面我们来介绍一下这 7 种工作模式。

在这里插入图片描述

1.1 Simple (简单模式)

在这里插入图片描述

名称描述
P(Producer)生产者,负责发送消息的程序。
C(Consumer)消费者,负责接收消息的程序。
Queue消息队列,图中黄色背景部分,类似一个邮箱,用于缓存消息;生产者向队列中投递消息,消费者从队列中取出消息。
特点一个生产者(P),一个消费者(C),消息只能被消费一次,因此也称为点对点(Point-to-Point)模式,适用于只能被单个消费者处理的场景。

1.2 Work Queue (工作队列)

在这里插入图片描述

在工作队列(Work Queue)模式中,一个生产者(P)向队列中发送消息,多个消费者(如 C1、C2)会从队列中接收不同的消息,RabbitMQ 会将消息分配给各消费者,确保每条消息只被消费一次,不会重复。该模式适用于集群环境中的异步处理场景,例如 12306 的短信通知服务:当用户订票成功后,订单消息会发送到 RabbitMQ,由短信服务从队列中获取订单信息并发送通知,同时在多个短信服务之间实现任务分配。

在这里插入图片描述

1.3 Publish / Subscribe 模式

在这里插入图片描述

在订阅模式中,图中的 X 表示交换机(Exchange)。相比简单模式,订阅模式增加了交换机这一角色,消息传递过程也随之发生变化。Exchange 的作用是接收生产者发送的消息,并根据指定的路由规则,将消息分发到一个或多个队列中。在 RabbitMQ 中,生产者不会直接将消息发送到队列,而是通过交换机完成路由。RabbitMQ 提供了四种类型的交换机:fanout、direct、topic 和 headers,每种类型对应不同的路由策略。

需要注意的是,Exchange 只负责消息的转发,不具备存储消息的能力。因此,如果没有队列绑定到 Exchange,或者没有符合路由规则的队列,消息将会被丢失。为避免消息丢失,应确保交换机与至少一个队列正确绑定,消息如果发送成功会被多个消费者同时接收,每个消费者接收的消息都相同。

交换机类型描述
Fanout广播模式:将消息发送给所有绑定到交换机的队列,适用于发布/订阅模式。
Direct定向模式:将消息发送给符合指定 routing key 的队列,适用于路由模式。
Topic通配符模式:将消息发送给符合 routing pattern(路由模式)的队列,适用于主题模式。

1.4 Routing Key 和 Binding Key

名称描述
Routing Key路由键。生产者在发送消息到交换机时指定的字符串,用于指示交换机如何处理该消息。
Binding Key绑定键。通过 Binding(绑定)将交换机与队列关联起来,并指定一个绑定键,以便 RabbitMQ 知道如何正确地将消息路由到队列。

在大多数情况下,包括官方文档和 RabbitMQ Java API 中,通常会将 Binding Key 和 Routing Key 统称为 Routing Key。为了避免混淆,可以这样理解:

  1. 在绑定队列时,使用的路由键称为 Binding Key。
  2. 在发送消息时,使用的路由键称为 Routing Key。

交换机接收到消息后,会将 Routing Key 与所有绑定到该交换机的队列的 Binding Key 进行匹配。如果 Routing Key 与某个 Binding Key 匹配成功,消息将被路由到相应的队列;如果没有匹配的队列,则消息会被丢弃或返回给生产者(具体行为取决于配置)。这种机制确保消息能够被正确分发到符合条件的队列中。

在这里插入图片描述
在这里插入图片描述

1.5 Routing (路由模式)

在这里插入图片描述
路由模式是发布订阅模式的变种,它在发布订阅的基础上增加了 Routing Key 的规则。发布订阅模式会将所有消息无条件地分发给所有消费者,而路由模式则由 Exchange 根据 Routing Key 的规则对消息进行筛选,并将其分发到对应的消费者队列中。该模式适用于需要根据特定规则分发消息的场景。

1.6 Topics (通配符模式)

在这里插入图片描述

主题模式(Topics)是路由模式的升级版,在 Routing Key 的基础上增加了通配符匹配功能,使消息分发更加灵活。其基本原理与路由模式相同:生产者将消息发送到交换机,交换机根据 Routing Key 将消息转发到匹配的队列中。不同之处在于 Routing Key 的匹配方式:路由模式使用完全匹配,而主题模式支持通配符匹配,类似正则表达式的方式定义 Routing Key 的模式。该模式适用于需要灵活匹配和过滤消息的场景。

1.7 RPC 通信

在这里插入图片描述

在 RPC 通信过程中不再区分生产者和消费者,而是更像传统的 RPC 远程调用。其核心机制是通过两个队列实现消息的请求和响应,从而完成一个带有回调功能的通信过程。

在这里插入图片描述

  1. 客户端将消息发送到指定的请求队列,同时在消息属性中设置 replyTo 字段,该字段用于指定回调队列,回调队列用于接收服务端的响应消息。
  2. 服务端接收请求后,处理完成并将响应消息发送到 replyTo 字段指定的回调队列。
  3. 客户端在回调队列中等待响应消息,一旦收到响应,客户端会检查消息的 correlationId 属性,以确保该消息是期望的响应结果。

1.8 Publisher Confirms (发布确认)

Publisher Confirms 模式是 RabbitMQ 提供的一种机制,用于确保消息可靠地发送到 RabbitMQ 服务器。在该模式下,生产者通过调用 channel.confirmSelect() 将通道设置为确认模式后,RabbitMQ 会为发布的每条消息分配一个唯一的序列号。生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态。消息被服务器接收并处理后,RabbitMQ 会异步向生产者发送确认(ACK),包含消息的唯一序列号,表明消息已成功送达。

通过 Publisher Confirms 模式,生产者能够确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失的问题。这种模式适用于对数据安全性要求较高的场景,例如金融交易和订单处理等领域。

在这里插入图片描述

二: 7 种工作模式的简单实现

2.1 简单模式

快速入门程序采用的是简单模式,此处略过详细描述。

在这里插入图片描述

2.2 Work Queues (工作队列)

工作队列模式是简单模式的增强版,其区别在于简单模式只有一个消费者,而工作队列模式支持多个消费者同时接收消息。消费者之间存在竞争关系,每条消息只能被其中一个消费者接收和处理。

在这里插入图片描述

  1. 引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>
  1. 编写生产者代码

工作队列模式与简单模式的区别在于支持多个消费者,因此生产者和消费者的代码差异不大。相比简单模式,生产者的代码基本保持一致。为了直观地展示多个消费者之间的竞争关系,可以一次发送 10 条消息,因此需要将发送消息的逻辑改为批量发送 10 条消息。

public class WorkRabbitProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); 
        factory.setPort(15673); 
        factory.setVirtualHost("bite"); 
        factory.setUsername("study"); 
        factory.setPassword("study"); 

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 声明队列,如果队列不存在,则创建;如果存在,则使用现有队列
        channel.queueDeclare("work_queues", true, false, false, null);

        // 3. 发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "Hello World " + i;
            channel.basicPublish("", "work_queues", null, msg.getBytes());
            System.out.println("消息发送: " + msg);
        }

        // 4. 释放资源
        channel.close();
        connection.close();
    }
}
  1. 消费者代码:消费者代码与简单模式一致,只是需要复制两份到两个文件中去启动。两个消费者的代码完全相同。
public class WorkRabbitmqConsumer1 {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址
        factory.setPort(15673); // 端口号2
        factory.setVirtualHost("bite"); // 虚拟主机名称
        factory.setUsername("study"); // 用户名
        factory.setPassword("study"); // 密码

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 声明队列,如果没有这样的队列,则自动创建;如果存在,则不创建
        channel.queueDeclare("work_queues", true, false, false, null);

        // 3. 接收消息并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };

        channel.basicConsume("work_queues", true, consumer);
    }
}

建议先启动两个消费者再启动生产者。如果先启动生产者,由于消息数量较少且处理速度较快,第一个启动的消费者可能会瞬间将10条消息全部消费完。因此,为了更均衡地分配消息,应该先启动两个消费者,再启动生产者。

在这里插入图片描述

2.3 Publish / Subscribe (发布 / 订阅)

在发布/订阅模型中引入了一个名为 Exchange 的角色。Exchange 有三种常见类型,每种类型代表不同的路由规则,从而对应不同的工作模式。接下来,我们将了解 Publish / Subscribe 模式的具体实现和特点。

交换机类型描述
Fanout广播模式:将消息发送给所有绑定到交换机的队列,适用于发布/订阅模式。
Direct定向模式:将消息发送给符合指定 routing key 的队列,适用于路由模式。
Topic通配符模式:将消息发送给符合 routing pattern(路由模式)的队列,适用于主题模式。

在这里插入图片描述

  1. 编写生产者代码,与前面两种模式的区别在于需要创建交换机,并将队列与交换机进行绑定。
public class FanoutRabbitProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址
        factory.setPort(15673); // 端口号
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名
        factory.setPassword("study"); // 密码

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 创建交换机
        /*
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
         * boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * 参数:
         * 1. exchange: 交换机名称
         * 2. type: 交换机类型
         *    * DIRECT("direct"): 定向直连
         *    * FANOUT("fanout"): 扇形(广播), 每个队列都能收到消息
         *    * TOPIC("topic"): 通配符模式
         *    * HEADERS("headers"): 参数匹配(较少使用)
         * 3. durable: 是否持久化
         * 4. autoDelete: 是否自动删除
         * 5. internal: 是否内部使用, 一般为false
         * 6. arguments: 参数
         */
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT, true, false, false, null);

        // 3. 声明队列,如果队列不存在则自动创建;如果存在,则不创建
        channel.queueDeclare("fanout_queue1", true, false, false, null);
        channel.queueDeclare("fanout_queue2", true, false, false, null);

        // 4. 绑定队列和交换机
        /*
         * queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
         * 参数:
         * 1. queue: 队列名称
         * 2. exchange: 交换机名称
         * 3. routingKey: 路由key
         * 如果交换机类型为fanout,routingKey设置为"",表示每个消费者都能收到所有消息
         */
        channel.queueBind("fanout_queue1", "fanout_exchange", "");
        channel.queueBind("fanout_queue2", "fanout_exchange", "");

        // 5. 发送消息
        /*
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * 参数说明:
         * 1. exchange: 交换机名称
         * 2. routingKey: 如果交换机类型为fanout,routingKey设置为"",表示每个消费者都能收到所有消息
         */
        String msg = "hello fanout";
        channel.basicPublish("fanout_exchange", "", null, msg.getBytes());
        System.out.println("消息发送: " + msg);

        // 6. 释放资源
        channel.close();
        connection.close();
    }
}
  1. 编写消费者代码,消费者 1 代码如下,消费者 2 的代码修改一下队列名称即可,此处省略
public class FanoutRabbitmqConsumer1 {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址
        factory.setPort(15673); // 端口号
        factory.setVirtualHost("bite"); // 虚拟主机名称
        factory.setUsername("study"); // 用户名
        factory.setPassword("study"); // 密码

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 接收消息并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };

        channel.basicConsume("fanout_queue1", true, consumer);
    }
}

在这里插入图片描述
在这里插入图片描述

2.4 Routing (路由模式)

在队列和交换机的绑定中,必须指定一个 BindingKey。消息发送方在向 Exchange 发送消息时,需要指定消息的 RoutingKey。Exchange 不会将消息交给所有绑定的队列,而是根据消息的 RoutingKey 进行匹配,只有当队列绑定的 BindingKey 与消息的 RoutingKey 完全一致时,队列才能接收到消息。

在这里插入图片描述

  1. 编写生产者代码
public class DirectRabbitProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址
        factory.setPort(15673); // 端口号
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名
        factory.setPassword("study"); // 密码

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 创建交换机
        channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT, true, false, false, null);

        // 3. 声明队列
        channel.queueDeclare("direct_queue1", true, false, false, null);
        channel.queueDeclare("direct_queue2", true, false, false, null);

        // 4. 绑定队列和交换机
        channel.queueBind("direct_queue1", "direct_exchange", "orange"); // 队列1绑定到 RoutingKey 为 "orange"
        channel.queueBind("direct_queue2", "direct_exchange", "black");  // 队列2绑定到 RoutingKey 为 "black"
        channel.queueBind("direct_queue2", "direct_exchange", "green");  // 队列2绑定到 RoutingKey 为 "green"

        // 5. 发送消息
        String msgOrange = "hello direct, I am orange";
        channel.basicPublish("direct_exchange", "orange", null, msgOrange.getBytes());
        System.out.println("发送消息:" + msgOrange);

        String msgBlack = "hello direct, I am black";
        channel.basicPublish("direct_exchange", "black", null, msgBlack.getBytes());
        System.out.println("发送消息:" + msgBlack);

        String msgGreen = "hello direct, I am green";
        channel.basicPublish("direct_exchange", "green", null, msgGreen.getBytes());
        System.out.println("发送消息:" + msgGreen);

        // 6. 释放资源
        channel.close();
        connection.close();
    }
}
  1. 编写消费者代码

在 Routing 模式下,消费者代码与 Publish/Subscribe 模式的消费者代码相同,可以直接复制两份,分别命名为 DirectRabbitmqConsumer1 和 DirectRabbitmqConsumer2。只需修改对应的队列名称即可实现不同的消费逻辑。

public class DirectRabbitmqConsumer1 {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址
        factory.setPort(15673); // 端口号
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名
        factory.setPassword("study"); // 密码

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 接收消息并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };

        // 监听队列 direct_queue1
        channel.basicConsume("direct_queue1", true, consumer);
    }
}

在这里插入图片描述
在这里插入图片描述

2.5 Topics (通配符模式)

Topics 模式与 Routing 模式的主要区别在于交换机类型和匹配规则:Topics 模式使用的交换机类型为 topic,而 Routing 模式使用的交换机类型为 direct。此外,topic 类型的交换机在匹配规则上进行了扩展,支持使用通配符进行匹配,而 direct 类型的交换机要求 BindingKey 和 RoutingKey 完全匹配。

在 topic 类型的交换机中,匹配规则有以下要求:RoutingKey 是由点 (.) 分隔的一系列单词,例如 “stock.usd.nyse”、“nyse.vmw”。BindingKey 与 RoutingKey 格式相同,也是由点分隔的字符串。同时,BindingKey 支持两种特殊字符进行模糊匹配:* 表示一个单词,# 表示多个单词(0 到 N 个)。

Binding Key路由队列说明
d.a.bQ1 和 Q2同时匹配到 Q1 和 Q2
d.a.fQ1仅匹配到 Q1
c.e.fQ2仅匹配到 Q2
d.b.f丢弃或返回生产者不匹配任何队列,需设置 mandatory 参数才能返回给生产者,否则直接丢弃

在这里插入图片描述

  1. 生产者代码
public class TopicRabbitProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址,默认值为localhost
        factory.setPort(15673); // 端口号,默认值为5672
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名,默认值为guest
        factory.setPassword("study"); // 密码,默认值为guest

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 创建交换机
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC, true, false, false, null);

        // 3. 声明队列
        channel.queueDeclare("topic_queue1", true, false, false, null);
        channel.queueDeclare("topic_queue2", true, false, false, null);

        // 4. 绑定队列和交换机
        // 队列1绑定到 "*.error",仅接收包含 "error" 的消息
        channel.queueBind("topic_queue1", "topic_exchange", "*.error");

        // 队列2绑定到 "#.info" 和 "*.error",接收包含 "info" 或 "error" 的消息
        channel.queueBind("topic_queue2", "topic_exchange", "#.info");
        channel.queueBind("topic_queue2", "topic_exchange", "*.error");

        // 5. 发送消息
        String msgError = "hello topic, I'm order.error";
        channel.basicPublish("topic_exchange", "order.error", null, msgError.getBytes());
        System.out.println("发送消息:" + msgError);

        String msgInfo = "hello topic, I'm order.pay.info";
        channel.basicPublish("topic_exchange", "order.pay.info", null, msgInfo.getBytes());
        System.out.println("发送消息:" + msgInfo);

        String msgPayError = "hello topic, I'm pay.error";
        channel.basicPublish("topic_exchange", "pay.error", null, msgPayError.getBytes());
        System.out.println("发送消息:" + msgPayError);

        // 6. 释放资源
        channel.close();
        connection.close();
    }
}
  1. 编写消费者代码

在 Topic 模式中,消费者代码与 Routing 模式的消费者代码相同,只需修改消费的队列名称即可。可以分别创建两个消费者,命名为 TopicRabbitmqConsumer1 和 TopicRabbitmqConsumer2,并将它们的队列名称分别修改为对应的队列。

public class TopicRabbitmqConsumer1 {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址,默认值为localhost
        factory.setPort(15673); // 端口号,默认值为5672
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名,默认值为guest
        factory.setPassword("study"); // 密码,默认值为guest

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 接收消息并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };

        // 监听队列 topic_queue1
        channel.basicConsume("topic_queue1", true, consumer);
    }
}

在这里插入图片描述

2.6 RPC 通信

RPC 是一种通过网络请求远程计算机上的服务,无需了解底层网络实现的技术,类似于 HTTP 的远程调用。使用 RabbitMQ 实现 RPC 通信时,通常通过两个队列完成一个可回调的过程。

  1. 客户端将消息发送到指定的请求队列,同时在消息属性中设置 replyTo 字段,该字段用于指定回调队列,回调队列用于接收服务端的响应消息。
  2. 服务端接收请求后,处理完成并将响应消息发送到 replyTo 字段指定的回调队列。
  3. 客户端在回调队列中等待响应消息,一旦收到响应,客户端会检查消息的 correlationId 属性,以确保该消息是期望的响应结果。

在这里插入图片描述

  1. 编写客户端代码,客户端:
步骤流程描述
1声明两个队列,其中一个为回调队列 replyQueueName,并声明当前请求的唯一标识 corrId。
2将 replyQueueName 和 corrId 配置到要发送的消息队列中,供服务端处理并回传结果。
3使用阻塞队列阻塞当前进程,监听回调队列中的消息,并将请求放入阻塞队列中等待响应。
4当阻塞队列收到消息时,主线程被唤醒,获取响应内容并打印返回结果。
public class RPCClient {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址,默认值为localhost
        factory.setPort(15673); // 端口号,默认值为5672
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名,默认值为guest
        factory.setPassword("study"); // 密码,默认值为guest

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 声明请求队列
        channel.queueDeclare("rpc_request_queue", true, false, false, null);

        // 3. 唯一标识本次请求
        String corrId = UUID.randomUUID().toString(); // 生成唯一标识符

        // 4. 声明一个临时队列用于回调,并获取其名称
        String replyQueueName = channel.queueDeclare().getQueue();

        // 5. 生成发送消息的属性
        /*
         * BasicProperties:用于设置消息的附加属性
         * - correlationId: 唯一标识本次请求
         * - replyTo: 回调队列的名称
         */
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .correlationId(corrId) // 设置唯一标识符
                .replyTo(replyQueueName) // 设置回调队列名称
                .build();

        // 6. 发送消息到指定队列
        String message = "hello rpc...";
        channel.basicPublish("", "rpc_request_queue", props, message.getBytes());
        System.out.println(" [RPCClient] Sent: " + message);

        // 7. 使用阻塞队列存储回调结果
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        // 8. 接收服务端的响应
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到回调消息: " + new String(body));
                // 判断响应的 correlationId 是否与请求的 corrId 一致
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8")); // 将响应消息放入阻塞队列
                }
            }
        };

        // 监听回调队列
        channel.basicConsume(replyQueueName, true, consumer);

        // 9. 从阻塞队列中获取响应结果
        String result = response.take(); // 阻塞直到接收到响应消息
        System.out.println(" [RPCClient] Result: " + result);

        // 10. 释放资源
        channel.close();
        connection.close();
    }
}
  1. 编写服务器代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("110.41.51.65"); // IP地址,默认值为localhost
        factory.setPort(15673); // 端口号,默认值为5672
        factory.setVirtualHost("bite"); // 虚拟主机名称,默认值为 "/"
        factory.setUsername("study"); // 用户名,默认值为guest
        factory.setPassword("study"); // 密码,默认值为guest

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 声明请求队列
        channel.queueDeclare("rpc_request_queue", true, false, false, null);

        // 设置同时最多只能处理一个消息
        channel.basicQos(1);

        System.out.println(" [RPCServer] Awaiting RPC requests");

        // 3. 创建消费者并监听请求队列
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 生成返回消息的属性
                /*
                 * AMQP.BasicProperties:消息的属性
                 * - correlationId: 对应请求的唯一标识符
                 */
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId()) // 关联请求的唯一标识
                        .build();

                // 获取请求内容
                String message = new String(body);
                System.out.println(" [RPCServer] Received request: " + message);

                // 生成响应内容
                String response = "request: " + message + ", response: 处理成功";

                // 回复消息到请求方的回调队列
                channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());
                System.out.println(" [RPCServer] Sent response: " + response);

                // 对消息进行手动应答
                /*
                 * basicAck(long deliveryTag, boolean multiple)
                 * 参数:
                 * - deliveryTag: 当前消息的标识
                 * - multiple: 是否批量应答
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 消费请求队列中的消息
        /*
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数:
         * - queue: 队列名称
         * - autoAck: 是否自动确认
         * - callback: 消费者逻辑
         */
        channel.basicConsume("rpc_request_queue", false, consumer);
    }
}

在这里插入图片描述

2.7 Publisher Confirms (发布确认)

消息中间件都会会面临消息丢失的问题,主要分为以下三种情况:

问题类型描述解决方案
生产者问题由于应用程序故障、网络抖动等原因,生产者未能成功将消息发送到消息中间件(Broker)。采用发布确认(Publisher Confirms)机制实现。
Broker问题生产者成功将消息发送到 Broker,但由于 Broker 自身未能妥善保存消息,导致消息丢失。使用消息持久化机制。
消费者问题Broker 将消息发送到消费者后,消费者在处理消息时出错,导致 Broker 将未正确消费的消息从队列中删除,从而丢失消息。使用消息应答机制。

在这里插入图片描述

发布确认是 RabbitMQ 的七大工作模式之一。生产者将信道设置为确认模式后,该信道上发布的每条消息都会被分配一个唯一的 ID(从 1 开始)。当消息成功投递到所有匹配的队列后,RabbitMQ 会发送一个包含消息唯一 ID 的确认消息给生产者,告知消息已正确到达目标队列。如果消息和队列是持久化的,确认消息会在将消息写入磁盘后发送。确认消息的 deliveryTag 表示消息的序号,同时,Broker 可以通过设置 channel.basicAck 方法的 multiple 参数,表示该序号之前的所有消息都已成功处理。

在这里插入图片描述

发送方确认机制的最大优势在于其异步性,生产者可以同时发布消息并等待信道返回确认消息。当消息被确认后,生产者可以通过回调方法处理确认消息;如果 RabbitMQ 因内部错误导致消息丢失,则会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令。使用发送确认机制必须将信道设置为确认模式,这是 AMQP 0.9.1 协议的扩展功能,默认情况下未启用,生产者需通过 channel.confirmSelect() 将信道设置为确认模式,发布确认有 3 种策略,接下来我们来学习这三种策略。

2.7.1 Publishing Messages Individually (单独确认)

static void publishMessagesIndividually() throws Exception {
    try (Connection connection = createConnection()) {
        // 创建信道
        Channel channel = connection.createChannel();

        // 开启信道确认模式
        channel.confirmSelect();

        // 声明队列
        channel.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME1, true, false, true, null);

        long start = System.currentTimeMillis();

        // 循环发送消息
        for (int i = 0; i < 200; i++) {
            String body = "消息 " + i;

            // 发布消息
            channel.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME1, null, body.getBytes());

            // 等待确认消息
            channel.waitForConfirmsOrDie(5_000);
        }

        long end = System.currentTimeMillis();
        System.out.format("Published %d messages individually in %d ms%n", MESSAGE_COUNT, end - start);
    }
}

可以发现发送 200 条消息耗时较长,这是因为上述代码在每发送一条消息后都会调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认。这种方式实际上是一种串行同步等待的策略,尤其对于持久化消息,还需要等待消息被存储到磁盘后才能返回。为了解决这个问题,RabbitMQ 提供了发布确认机制的异步操作,可以在发送消息的同时异步等待消息确认,从而大幅提升效率。

2.7.2 Publishing Messages in Batches (批量确认)

每发送一批消息后,调用 channel.waitForConfirms 方法等待服务器返回确认结果。

static void publishMessagesInBatch() throws Exception {
    // 创建连接
    try (Connection connection = createConnection()) {
        // 创建信道
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        // 声明队列
        channel.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME2, true, false, true, null);

        // 设置批量大小
        int batchSize = 100; // 每次确认的消息数量
        int outstandingMessageCount = 0; // 记录未确认的消息数量

        long start = System.currentTimeMillis(); // 记录开始时间

        // 循环发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            // 生成消息内容
            String body = "消息 " + i;

            // 发布消息
            channel.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME2, null, body.getBytes());
            outstandingMessageCount++; // 增加未确认消息数量

            // 批量确认消息
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirmsOrDie(5_000); // 等待确认,超时时间为 5000 毫秒
                outstandingMessageCount = 0; // 清空未确认消息数量
            }
        }

        // 处理剩余未确认的消息
        if (outstandingMessageCount > 0) {
            channel.waitForConfirmsOrDie(5_000); // 等待确认
        }

        long end = System.currentTimeMillis(); // 记录结束时间

        // 打印消息发送耗时
        System.out.format("Published %d messages in batch in %d ms%n", MESSAGE_COUNT, end - start);
    }
}

相比于单独确认策略,批量确认显著提升了确认的效率,但其缺点在于当出现 Basic.Nack 或超时时,无法明确是哪条消息出现问题,客户端需要重新发送整批消息,这可能导致明显的重复消息数量。如果消息频繁丢失,批量确认的性能可能不增反降。

2.7.3 Handling Publisher Confirms Asynchronously (异步确认)

异步确认的编程实现相对复杂。Channel 接口提供了 addConfirmListener 方法,用于添加 ConfirmListener 回调接口。ConfirmListener 接口包含两个方法:handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple),分别用于处理 RabbitMQ 发送给生产者的 ack 和 nack 消息。其中,deliveryTag 表示消息的序号,multiple 表示是否批量确认。

为每个信道维护一个已发送消息的序号集合是必要的。当收到 RabbitMQ 的确认回调时,从集合中删除对应的消息。在开启确认模式后,信道上发送的每条消息都会附带一个从 1 开始递增的 deliveryTag 序号。可以利用 SortedSet 的有序性来维护这些已发送的消息序号,从而高效管理确认状态。

情况处理逻辑
收到 ack从序列中删除对应消息的序号。如果是批量确认,则表示小于等于当前序号 deliveryTag 的所有消息都已确认,需清除对应的集合。
收到 nack处理逻辑与 ack 类似,但需要结合具体业务场景,对未确认的消息进行处理,如消息重发等操作。
static void handlePublishConfirmsAsynchronously() throws Exception {
    try (Connection connection = createConnection()) {
        // 创建信道
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME3, false, false, true, null);

        // 开启信道确认模式
        channel.confirmSelect();

        // 创建有序集合,用于存储未确认的消息序号,确保消息的顺序性
        SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

        // 添加确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            // 处理 ack 消息
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    // 批量确认:清除集合中小于等于 deliveryTag 的消息序号
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    // 单条确认:移除当前消息序号
                    confirmSet.remove(deliveryTag);
                }
            }

            // 处理 nack 消息
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.format("deliveryTag: %d, multiple: %b%n", deliveryTag, multiple);
                if (multiple) {
                    // 批量确认失败:清除集合中小于等于 deliveryTag 的消息序号
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    // 单条确认失败:移除当前消息序号
                    confirmSet.remove(deliveryTag);
                }
                // 如果处理失败,这里需要添加消息重发逻辑(具体实现可根据业务需求补充)
            }
        });

        // 循环发送消息
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;

            // 获取下次发送消息的序号(从 1 开始递增)
            long nextPublishSeqNo = channel.getNextPublishSeqNo();

            // 发布消息
            channel.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME3, null, message.getBytes());

            // 将消息序号存入集合
            confirmSet.add(nextPublishSeqNo);
        }

        // 等待所有消息确认完毕
        while (!confirmSet.isEmpty()) {
            Thread.sleep(10); // 每隔 10 毫秒检查一次
        }

        long end = System.currentTimeMillis();
        System.out.format("Published %d messages and handled confirms asynchronously in %d ms%n", MESSAGE_COUNT, end - start);
    }
}

http://www.kler.cn/a/461219.html

相关文章:

  • linux-26 文件管理(四)install
  • 朱姆沃尔特隐身战舰:从失败到威慑
  • PostgreSQL的备份方式
  • 第二十六天 自然语言处理(NLP)词嵌入(Word2Vec、GloVe)
  • java中的基本数据类型有哪些?
  • filebeat采集应用程序日志和多行匹配
  • Android学习小记3
  • 耳切法简述
  • 矩阵的因子分解3-LU分解和LDU分解
  • WebSocket 入门详解
  • 【每日学点鸿蒙知识】Taro、native层获取文件宽度、位置变化callback、数据迁移、oh_modules说明等
  • QT--多线程
  • 深入浅出 Spring (二)| 依赖注入(DI)、自动装配
  • 课程思政元素收集系统|Java|SSM|JSP|
  • 计算机网络基础知识(7)中科大郑铨老师笔记
  • 【视觉SLAM:四、相机与图像】
  • 公交智眼 4G 录像机:开启安全运营新篇章
  • spring中常见的自动注入方式
  • 论文实现:Reactive Nonholonomic Trajectory Generation via Parametric Optimal Control
  • Vue3 简介
  • C++初步认识函数
  • @RestControllerAdvice注解
  • OneOS操作系统入门-驱动-03:I2C总线及驱动
  • java实现excel导入参考资料合集
  • Zookeeper在中间件的应用和在Spring Boot业务系统中实现分布式锁和注册中心的解决方案
  • CT 扫描显示 USB-C 电缆可能隐藏复杂的恶意硬件