当前位置: 首页 > article >正文

RabbitMQ 路由(Routing)通讯方式详解

在现代分布式系统中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 作为一个广泛使用的消息代理(Message Broker),提供了多种消息传递模式,其中路由(Routing)模式是一种非常强大且灵活的通讯方式。本文将深入探讨 RabbitMQ 中的路由模式,帮助读者理解其工作原理、应用场景以及如何通过 Java 代码实现。


1. 什么是路由模式?

在 RabbitMQ 中,路由模式是基于 Direct Exchange 的一种消息传递模式。与简单的 Fanout Exchange 不同,Direct Exchange 允许消息发送者根据特定的路由键(Routing Key)将消息发送到特定的队列。这种模式提供了更细粒度的消息分发控制,使得消息可以根据业务需求被精确地路由到目标队列。

在这里插入图片描述

1.1 关键概念

  1. Direct Exchange: 直接交换机,根据消息的路由键将消息路由到绑定到该交换机的队列。
  2. Routing Key: 路由键是消息的一个属性,用于指定消息的目标队列。交换机会根据路由键将消息路由到匹配的队列。
  3. Binding: 绑定是交换机和队列之间的关联关系。在绑定过程中,可以指定一个路由键,交换机会根据这个路由键将消息路由到相应的队列。

2. 路由模式的工作原理

在路由模式中,消息的发送者和接收者通过交换机进行通信。以下是路由模式的工作流程:

  1. 生产者(Producer) 发送消息到 Direct Exchange,并指定一个路由键。
  2. Direct Exchange 根据消息的路由键,将消息路由到与之绑定的队列。
  3. 消费者(Consumer) 从队列中接收消息并进行处理。

2.1 示例场景

假设我们有一个日志系统,需要将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。我们可以使用路由模式来实现这一需求。

  1. 定义交换机: 创建一个 Direct Exchange,命名为 logs_exchange
  2. 定义队列: 创建三个队列,分别命名为 info_queueerror_queuewarning_queue
  3. 绑定队列: 将 info_queue 绑定到 logs_exchange,并指定路由键为 info;将 error_queue 绑定到 logs_exchange,并指定路由键为 error;将 warning_queue 绑定到 logs_exchange,并指定路由键为 warning
  4. 发送消息: 生产者发送消息到 logs_exchange,并指定路由键为 infoerrorwarning
  5. 接收消息: 消费者从相应的队列中接收消息并进行处理。

3. 路由模式的 Java 实现

以下是一个使用 Java 和 RabbitMQ Java Client 库实现路由模式的示例代码。

3.1 添加依赖

首先,在 pom.xml 中添加 RabbitMQ 的依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

3.2 生产者代码

生产者负责发送消息到 Direct Exchange,并指定路由键。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class DirectProducer {
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Direct Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 定义路由键和消息
            String severity01 = "info"; // 可以是 "info", "error", "warning"
            String message01 = "This is an info message";

            // 发送消息到交换机,并指定路由键
            channel.basicPublish(EXCHANGE_NAME, severity01, null, message01.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + severity01 + "':'" + message01 + "'");

            // 定义路由键和消息
            String severity02 = "warning"; // 可以是 "info", "error", "warning"
            String message02 = "This is an info message";

            // 发送消息到交换机,并指定路由键
            channel.basicPublish(EXCHANGE_NAME, severity02, null, message02.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + severity02 + "':'" + message02 + "'");
        }
    }
}

3.3 消费者代码

消费者负责从队列中接收消息并处理。

3.3.1 消费者DirectConsumer01

  • 接受消息中包含info, error, warning的数据。
import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;

public class DirectConsumer01 {
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 Direct Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定队列到交换机,并指定路由键
        String[] severities = {"info", "error", "warning"}; // 可以只绑定部分路由键
        for (String severity : severities) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 定义消息处理回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        // 开始消费消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3.3.2 消费者DirectConsumer02

  • 接受消息中包含error, warning的数据,但不接受消息中有info的数据。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class DirectConsumer02 {
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 Direct Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定队列到交换机,并指定路由键
        String[] severities = {"error", "warning"}; // 可以只绑定部分路由键
        for (String severity : severities) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 定义消息处理回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        // 开始消费消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

4. 运行示例

  1. 启动 RabbitMQ 服务: 确保 RabbitMQ 服务已启动并运行在 192.168.200.138
  2. 运行消费者: 启动消费者程序,绑定队列到交换机,并等待消息。
  3. 运行生产者: 启动生产者程序,发送带有不同路由键的消息。

4.1 输出示例

4.1.1 生产者输出
 [x] Sent 'info':'This is an info message'
 [x] Sent 'warning':'This is an info message'
4.1.2 消费者DirectConsumer01输出
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'info':'This is an info message'
 [x] Received 'warning':'This is an info message'
4.1.3 消费者DirectConsumer02输出
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'warning':'This is an info message'

在这里插入图片描述


5. 路由模式的应用场景

  1. 日志系统: 将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。
  2. 通知系统: 根据用户的订阅类型(如 emailsmspush)将通知发送到不同的队列。
  3. 任务分发: 根据任务的类型(如 high_prioritylow_priority)将任务分发到不同的队列。

总结

RabbitMQ 的路由模式(Routing)通过 Direct Exchange 提供了灵活的消息分发机制,使得消息可以根据路由键被精确地路由到目标队列。


http://www.kler.cn/a/451637.html

相关文章:

  • “高精度算法”思想 → 大数阶乘
  • JS中若干相似特性的区别
  • Mysql InnoDB存储引擎中聚簇索引和非聚簇索引的区别
  • 无人设备遥控器之定向天线篇
  • 帧缓存的分配
  • 蓝牙协议——音量控制
  • 金融领域研发效能的特性有哪些?拓展边界是什么?
  • 内网穿透ubuntu20 docker coplar
  • 14_HTML5 input类型 --[HTML5 API 学习之旅]
  • Centos7 安装 zip 软件失败,更换yum 源方法
  • 麒麟信安参展南京软博会,支持信创PC的新一代云桌面及全行业解决方案备受瞩目
  • 一文了解Oracle数据库如何连接(1)
  • Linux复习4——shell与文本处理
  • 基于C#实现的(WinForm)模拟操作系统文件管理系统
  • 基于STM32 USB接口的温度控制器设计
  • 基于SpringBoot在线音乐系统平台功能实现十二
  • 【华为OD-E卷-狼羊过河 100分(python、java、c++、js、c)】
  • 2002 - Can‘t connect to server on ‘192.168.1.XX‘ (36)
  • 母婴用品系统|Java|SSM|JSP|
  • Text2Reward学习笔记
  • 消息队列(二)消息队列的高可用原理
  • 面试场景题系列:设计一致性哈希系统
  • vue实现2048小游戏
  • DP83848以太网移植流程,可以TCP通信
  • element-puls封装表单验证
  • python中使用selenium执行组合快捷键ctrl+v不生效问题