当前位置: 首页 > article >正文

默契之舞 之 生产者消费者模式(RabbitMQ)

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~
🌱🌱个人主页:奋斗的明志
🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

在这里插入图片描述

生产者消费者

  • 一、RabbitMQ 快速入门
    • 1、创建一个空的项目
    • 2、引入依赖
  • 二、编写生产者代码
  • 三、编写消费者代码
  • 四、常见报错异常类型
    • 1、关闭顺序调换
    • 2、队列不存在
    • 3、端口号错误
    • 4、检查账号密码是否错误
    • 5、检查虚拟机
  • 五、小结

一、RabbitMQ 快速入门

1、创建一个空的项目

在这里插入图片描述


在这里插入图片描述

2、引入依赖

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

在这里插入图片描述

二、编写生产者代码

建立连接需要信息

  1. ip
  2. 端口号
  3. 账号
  4. 密码
  5. 虚拟主机

在这里插入图片描述


在这里插入图片描述


生产者和消费者创建的 channel 并不是同⼀个

//2.开启信道
Channel channel = connection.createChannel();

//3.生产者需要声明一个交换机
//rabbitmq 在创建的时候就有默认的交换机
//使用内置的交换机

在这里插入图片描述

  • 声明队列
  • 例如: 如果有⼀个名为 “hello” 的队列, 生产者可以直接发送消息到 “hello” 队列, 而消费者可以从
    “hello” 队列中接收消息, 而不需要关心交换机的存在. 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是⼀对⼀的.

在这里插入图片描述

 //4.声明队列
 /**
  * AMQP.Queue.DeclareOk queueDeclare
  * (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
  * throws IOException;
  *
  * 参数说明:
  * var1 :队列名称(queue)
  * var2 :可持久化 (durable)
  * var3:是否独占(exclusive)
  * var4:是否自动删除(autoDelete)
  * var5: 参数
 */
channel.queueDeclare("hello",true,false,false,null);
  • 生产者完整代码
package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置ip 此时 rabbitmq 在云服务器上,我们通过本地去运行的,需要公网IP,需要提前开放端口号
        connectionFactory.setHost("123.57.16.61");
        //设置端口号 默认的端口号 5672
        connectionFactory.setPort(5672);
        //设置账号
        connectionFactory.setUsername("study");
        //设置密码
        connectionFactory.setPassword("study");
        //设置虚拟机
        connectionFactory.setVirtualHost("bite");
        //在这个工厂里面拿到一个连接
        Connection connection = connectionFactory.newConnection();

        //2.开启信道
        Channel channel = connection.createChannel();

        //3.生产者需要声明一个交换机
        //rabbitmq 在创建的时候就有默认的交换机
        //使用内置的交换机


        //4.声明队列
        /**
         * AMQP.Queue.DeclareOk queueDeclare
         * (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
         * throws IOException;
         *
         * 参数说明:
         * var1 :队列名称(queue)
         * var2 :可持久化 (durable)
         * var3:是否独占(exclusive)
         * var4:是否自动删除(autoDelete)
         * var5: 参数
         */
        channel.queueDeclare("hello",true,false,false,null);

        //交换机与队列之间的绑定关系
        //内置交换机都有自己的绑定关系

        //5.发送消息
        //依然用 channel 进行发送
        /**
         * void basicPublish
         * (String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
         * throws IOException;
         * 参数说明:
         * var1:交换机的名称 (exchane)
         * var2:内置交换机(routingKey)和队列名称保持一致,
         * var3:属性配置
         * var4:消息
         */
        String mag = "hello rabbitmq~";
        channel.basicPublish("","hello",null,mag.getBytes());

        //6.资源的释放
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

  • 观察界面(发送信息成功)

在这里插入图片描述

如果在代码中注掉资源释放的代码, 在Connections和Channels也可以看到相关信息

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

三、编写消费者代码

消费者代码和⽣产者前3步都是⼀样的, 第4步改为消费当前队列

  1. 创建连接
  2. 创建Channel
  3. 声明⼀个队列Queue
  4. 消费消息
  5. 释放资源

消费当前队列

basicConsume:

/*
2 basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue: 队列名称
2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
3. callback: 回调对象
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

Consumer:

Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了**Consumer** 接⼝的对象.

DefaultConsumer 是 **RabbitMQ**提供的⼀个默认消费者, 实现了Consumer 接⼝.

核心方法:

handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时, 会自动动调用该方法.
在这个方法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.
参数说明如下:
▪ consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
▪ envelope : 包含消息的封包信息,如队列名称, 交换机等.
▪ properties : ⼀些配置信息
▪ body : 消息的具体内容

//6. 接收消息, 并消费
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue: 队列名称
2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
3. callback: 回调对象
 */
DefaultConsumer consumer = new DefaultConsumer(channel) {
 /*
 回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
 1. consumerTag: 标识
 2. envelope: 获取⼀些信息, 交换机, 路由key
3. properties:配置信息
4. body:数据

*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
     System.out.println("接收到消息: " + new String(body));
   }
};
channel.basicConsume("hello", true, consumer);

运行代码, 观察结果

在这里插入图片描述


在这里插入图片描述

  • 完整代码
package rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建一个工厂连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("123.57.16.61");//设置公网ip
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("study");
        connectionFactory.setPassword("study");
        connectionFactory.setVirtualHost("bite");//设置虚拟机
        Connection connection = connectionFactory.newConnection();

        //创建一个通信
        Channel channel = connection.createChannel();

        //声明一个队列(可以省略)
        channel.queueDeclare("hello",true,false,false,null);

        //4. 消费消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数说明:
         * queue: 队列名称
         * autoAck: 是否自动确认
         * callback: 接收到消息后, 执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };

        channel.basicConsume("hello", true, consumer);
        //等待程序执行完成
//        Thread.sleep(2000);
        //5. 释放资源
//        channel.close();
//        connection.close();
    }
}

四、常见报错异常类型

1、关闭顺序调换

在这里插入图片描述

2、队列不存在

在这里插入图片描述

3、端口号错误

在这里插入图片描述

4、检查账号密码是否错误

在这里插入图片描述

5、检查虚拟机

在这里插入图片描述

五、小结

生产者-消费者模型是一种常见的消息队列应用场景,尤其在异步处理、解耦和提高系统可扩展性方面非常有效。使用 RabbitMQ 实现生产者-消费者模式可以让我们在不同的服务或系统之间解耦,提高系统的可靠性和性能。

在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/448001.html

相关文章:

  • 《LangChain大模型应用开发》书籍分享
  • 【day11】面向对象编程进阶(继承)
  • RabbitMQ消息可靠性保证机制7--可靠性分析-rabbitmq_tracing插件
  • C#中方法参数传值和传引用的情况
  • LabVIEW深海气密采水器测控系统
  • 开源轮子 - Logback 和 Slf4j
  • [react 3种方法] 获取ant组件ref用ts如何定义?
  • CSS系列(25)-- 滚动优化详解
  • [DASCTF 2024最后一战|寒夜破晓,冬至终章] 数论的气氛
  • rk3568之mpp开发笔记怎么实现mpp编码摄像头实时码流?
  • 换工作,如何退出微软账户???(删除注册表数据)
  • powerhsell 初认识
  • 252-8路SATAII 6U VPX高速存储模块
  • 一个类就创建Json反序列化所需的属性
  • golang,gowork工具
  • UI自动化概念+Web自动化测试框架
  • 第146场双周赛:统计符合条件长度为3的子数组数目、统计异或值为给定值的路径数目、判断网格图能否被切割成块、唯一中间众数子序列 Ⅰ
  • CE之植物大战僵尸植物无冷却
  • 60.基于SSM的个人网站的设计与实现(项目 + 论文)
  • HarmonyOS NEXT 技术实践-基于意图框架服务实现智能分发
  • simulink离散传递函数得到差分方程并用C语言实现
  • 二叉树_堆
  • 实验二 组合逻辑电路部件实验
  • 青少年编程与数学 02-004 Go语言Web编程 07课题、WebSockets
  • 【java 正则表达式 笔记】
  • 机器学习零基础小白指南---- 线性代数入门