当前位置: 首页 > article >正文

【RabbitMQ】RabbitMQ的下载安装及使用

安装RabbitMQ

下载网站:https://www.rabbitmq.com/docs/install-windows

在这里插入图片描述

点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang

在这里插入图片描述

下载Erlang

Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能

在这里插入图片描述

这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)

在这里插入图片描述

下载RabbitMQ

下载Erlang的同时,也顺便下载RabbitMQ吧

在这里插入图片描述

或者直接使用别人下载好的包,比如我这提供的包

安装Erlang

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Next继续

在这里插入图片描述

  • 点击Install安装

在这里插入图片描述

  • 安装完后,点击Close即可

在这里插入图片描述

安装RabbitMQ

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Install安装

在这里插入图片描述

  • 安装成功后,点击Next继续

在这里插入图片描述

  • 点击Finish完成安装

在这里插入图片描述

安装插件

找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入rabbitmq-plugins.bat enable rabbitmq_management命令

在这里插入图片描述

重启RabbitMQ服务后访问http://localhost:15672

在这里插入图片描述

默认账号密码均为guest

在这里插入图片描述

整合RabbitMQ

  • 导入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.24.0</version>
</dependency>

一对一队列模型

  • 生产者发送消息
/**
 * 一对一消息队列模型:生产者
 */
public class SingleProducer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂,用于创建到RabbitMQ服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ服务器地址
        factory.setHost("localhost");
        // 创建一个连接,用于和RabbitMQ服务器建立通信通道
        try (Connection connection = factory.newConnection();
             // 创建一个通道
             Channel channel = connection.createChannel()) {
            // 声明一个队列,队列名为hello
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 将消息发布到指定队列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

运行后,RabbitMQ管理后台会增加一个队列

在这里插入图片描述

  • 消费者消费消息
/**
 * 一对一消息队列模型:消费者
 */
public class SingleConsumer {
   // 定义我们正在监听的队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂并配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 从工厂中获取一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个新的通道
        Channel channel = connection.createChannel();
        // 声明一个队列,在该通道中声明我们要监听的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 创建一个回调函数,用于处理从队列中接收到的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 获取消息体并转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

未运行代码前

在这里插入图片描述

运行代码后

在这里插入图片描述

此时在不中断消费者代码运行的情况下,再去运行生产者代码,会发现消费者会持续消费生产者增加的消息

在这里插入图片描述

一对多队列模型

  • 生产者发送消息
/**
 * 一对多消息队列:生产者
 */
public class MultiProducer {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // durable参数设置为 true,服务器重启后队列不丢失
         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		// 此处使用scanner循环输入消息,模拟生产者不定时多次生产
        Scanner scanner = new Scanner(System.in); 
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            // 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }
}

运行代码,可以模拟发送多条数据

在这里插入图片描述

  • 消费者消费信息
/**
 * 一对多消息队列:消费者
 */
public class MultiConsumer {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
      // 设置持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        try {
            System.out.println(" [x] Received '" + message + "'");
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            // 手动确认消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    // 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  // 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程
  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

运行消费者代码

在这里插入图片描述

队列中的消息不是一次性全部接收的,而是需要等待消费者完成消费(处理完事务后)并主动确认完成后,才会继续发送下一条消息

在这里插入图片描述

这与一对多有什么关系呢?不急,我们停掉消费者代码运行先,然后让生产者进行生成消息

在这里插入图片描述

然后,直接拷贝一份消费者代码,命名为MultiConsumer2。此时运行两个消费者看效果

在这里插入图片描述

会发现,两个消费者会轮流接收队列消息,消费完(完成任务)后才会确认并接收新的队列消息,直至队列所有消息被消费完

在这里插入图片描述
下面交换机模型为初学者见解,可能存在理解错误,看看就好,我后面也会深入学习,但大概率不会再修改本文章,所以要专业的、准确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢

交换机模型

交换机是消息队列中的一个组件,有点类似于中转站,上面两个模型都是生产者创建消息队列,然后由消费者去接收指定消息队列中的消息,而交换机模型中,生产者不再创建指定的消息队列,而是创建一个交换机,由消费者去绑定交换机并创建消息队列,然后再接收生产者的消息。由直接变间接。

这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交错也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及其后续变动

在这里插入图片描述

交换机有direct, topic, headersfanout四种类型,因为headers交换机的性能较差,不太推荐使用,了解有该类型即可

Fanout交换机

fanout有扇出的意思,该类型交换机会把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户

在这里插入图片描述

  • 生产者发送消息
/**
 * 交换机模型:生产者
 */
public class FanoutProducer {

    // 定义交换机的名称
    private static final String EXCHANGE_NAME = "exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置工厂的主机地址
        factory.setHost("localhost");
        // 创建一个连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //  声明一个交换机,交换机名称为exchange,类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 发布消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                // 获取用户输入
                String message = scanner.nextLine();
                // 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

此时运行生产者代码,并输入内容,会发现队列表中没有新增队列

在这里插入图片描述

但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息

在这里插入图片描述

  • 消费者消费消息
/**
 * 交换机模型:消费者
 */
public class FanoutConsumer {
  // 声明交换机的名称
  private static final String EXCHANGE_NAME = "exchange";

  public static void main(String[] argv) throws Exception {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置工厂的主机地址
    factory.setHost("localhost");
    // 创建一个连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明一个交换机,交换机名称为exchange,类型为fanout
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    // String queueName = channel.queueDeclare().getQueue();
    // 创建队列,名称为fanout_queue,并绑定到交换机上
    String queueName = "yg1_queue";
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [员工1] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

运行消费者代码,会发现队列表中新增了yg1_queue队列

在这里插入图片描述

此时拷贝多一份消费者代码,并修改队列名为yg2_queue

在这里插入图片描述

运行后,队列表会新增yg2_queue队列

在这里插入图片描述

此时生产者发送消息后,会同时发送给两位消费者进行处理

在这里插入图片描述

需要注意的是,如果生产者先发送消息,再创建消费者,因为还没有创建存储的消息队列,所以是无法存储消息的,即消费者无法接收队列创建前的旧消息;但如果消费者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消费者,还是能接收到消息的;

就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你

以一对多队列模型为例

  • 我们删除队列表中所有队列

在这里插入图片描述

  • 然后只运行生产者代码,会发现它直接创建好了一个消息队列

在这里插入图片描述

  • 此时发送消息后再启动消费者,消费者是能接收到队列消息的

在这里插入图片描述

因为消息只在消息队列中传递,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消费者创建

在这里插入图片描述

在这里插入图片描述

为什么要用交换机呢? (个人理解)

打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的好处在于,后面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)

Direct交换机

fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,即使有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派任务,通过路由键分发任务给指定消息队列

  • 生产者发送消息
/**
 * direct交换机模型:生产者
 */
public class DirectProducer {
  // 定义交换机名称
  private static final String EXCHANGE_NAME = "direct_exchange";

  public static void main(String[] argv) throws Exception {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置连接工厂的主机地址为本地主机
    factory.setHost("localhost");
    // 建立连接并创建通道
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 使用通道声明交换机,类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            // 读取用户输入内容,并以空格分隔
            String userInput = scanner.nextLine();
            String[] parts = userInput.split(" ");
            if(parts.length < 1){
                continue;
            }
            String message = parts[0];
            // 路由键,用于确定消息被路由到哪个队列
            String severity = parts[1];
            // 发布消息到交换机
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");
        }
    }
  }
}

运行生产者代码,同样不会生成消息队列而是创建类型为direct的交换机

在这里插入图片描述

  • 消费者消费消息
/**
 * direct交换机模型:消费者
 */
public class DirectConsumer {
    // 定义我们正在监听的交换机名称
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接工厂的主机地址为本地主机
        factory.setHost("localhost");
        // 建立与 RabbitMQ 服务器的连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 声明一个 direct 类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 声明一个匿名队列,并获取队列名称
        // String queueName = channel.queueDeclare().getQueue();
        // 手动声明队列名称
        String queueName = "cy_queue";
        // 创建队列,并设置队列的持久化属性为 true
        channel.queueDeclare(queueName, true, false, false, null);
        // 绑定队列到交换机上,并指定绑定路由键为“cy”
        channel.queueBind(queueName, EXCHANGE_NAME, "cy");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 创建一个 DeliverCallback 实例来处理接收到的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [残炎] 收到了 '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        // 开始消费队列中的消息,设置自动确认消息已被消费
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

运行消费者代码,会生成指定名称的队列

在这里插入图片描述

此时,生产者发送消息并指定路由键后,对应的消费者会收到消息,而不属于它 的消息不会收到

在这里插入图片描述

同样的,拷贝一份消费者代码并启动

在这里插入图片描述

生产者每次发送给不同目标发送消息,都会精准无误地转发到指定目标

在这里插入图片描述

其实上面也同样演示了一个问题,那就是发送消息给不存在的路由键目标,也就是还没拷贝第二份消费者(fy)代码时生产者给fy发送的消息,是直接丢弃的。

direct就像是能输入具体发送目标类型的红包脚本,它允许我们自行选择要发送的目标群类型,而不是账号下的所有群,毕竟有些群可能是不相干的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又或者你只想给自己所有的家族群发(只要你有标记哪些群是家族群)

direct能否给不同队列发送消息?

可以的,官网明确说了,不同消息队列允许绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上

在这里插入图片描述

Topic交换机

topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是允许指定多个目标(注意,这里的目标指的是路由键而非具体的消息队列)。

  • 生产者发送消息
/**
 * topic交换机模型:生产者
 */
public class TopicProducer {
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂并设置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 建立连接并创建通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 使用通道声明交换机,类型为topic
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String userInput = scanner.nextLine();
                String[] parts = userInput.split(" ");
                if (parts.length < 1) {
                    continue;
                }
                String message = parts[0];
                String routingKey = parts[1];
                // 发布消息到指定的交换机和路由键
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");
            }
        }
    }
}

运行生产者代码,同样不会生成消息队列而是创建类型为topic的交换机

在这里插入图片描述

  • 消费者消费消息
/**
 * topic交换机模型:消费者
 */
public class TopicConsumer {

    // 定义监听的交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明使用的交换机,并指定类型为topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 创建cy队列
        String queueName = "cy_queue";
        // 声明队列,并设置队列未持久化、非排他、非自动删除
        channel.queueDeclare(queueName, true, false, false, null);
        // 绑定队列到交换机上,并指定路由键模式为“#.cy.#”
        channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [cy组] 收到 '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

同样拷贝一份消费者代码并运行

在这里插入图片描述

与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别

在这里插入图片描述

但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以任意形式组合,并发送对应消息给他们

在这里插入图片描述

这样,我们就可以一次性指定多个具体目标去处理指定消息

值得注意的一点,topic允许一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息

我们再拷贝一份消费者代码,修改如下,这里我们绑定的路由键包含cy与fy

在这里插入图片描述

此时无论我们给cy发,还是fy发,cyfy组都能收到消息

在这里插入图片描述

且需要注意的是,即使我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复接收的情况

在这里插入图片描述

topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家族群发,还想给我创建的群发,或者我管理的群,很显然,这需要我重新设置目标类型并再一次启动脚本。

在这里插入图片描述

这自然没有任何问题,无非就是需要操作多次的问题,可事实上,可能有些家族群是由我创建的或者由我管理的,那么就会出现一个问题,脚本会在某些群里发送多次红包,这很显然不符合我给指定目标群发送一个拼手气红包的目的。topic便是能处理这个问题的脚本,我能一次性设置家族群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且及时有些群满足多个条件,也只会发送一个。


http://www.kler.cn/a/538878.html

相关文章:

  • sklearn基础教程
  • 数据结构-基础
  • 神经网络(Neural Network)
  • Mybatis
  • 现在中国三大运营商各自使用的哪些band频段
  • 微信小程序如何使用decimal计算金额
  • FaceFusion如何设置公开链接和端口
  • ASN.1 格式与Java类转换
  • 【自然语言处理】利用Memory Layer替换Transformer中的FFN
  • 缓存实战:Redis 与本地缓存
  • 黑马React保姆级(PPT+笔记)
  • 使用 Three.js 实现热力渐变效果
  • C++线程池
  • 如何设置爬虫的延时避免频繁请求?
  • 使用rustDesk搭建私有远程桌面
  • vue+element-ui简洁完美实现ju动漫网站
  • ASP.NET Core托管服务
  • Java 线程池内部任务出异常后,如何知道是哪个线程出了异常
  • 【Python】元组
  • Deepseek访问受限?换种方式轻松使用
  • 22.3、IIS安全分析与增强
  • 【React】实现TagInput输入框,可以输入多个邮箱并校验是否合法
  • Agent论文阅读:NormEnforcement with a Soft Touch: Faster Emergence, Happier Agents
  • 阿里云服务器XShell连接启动java -jar xxx.jar后退出ssh,后端也退出,使用screen 亲测管用!
  • 【Jetson Nano安装gpu版pytroch1.7torchvision0.8.1 in python3.8 跑 Ultralytics YOLO】
  • 关于预训练后训练、LLM和视频大模型相关学习记录