当前位置: 首页 > article >正文

RabbitMQ基本介绍及简单上手

(一)什么是MQ

  MQ(message queue)本质上是队列,满足先入先出,只不过队列中存放的内容是消息而已,那什么是消息呢? 消息可以是字符串,json也可以是一些复杂对象

 我们应用场景通常是再分布式系统之间

    系统通信方式

一般我们的系统调用方式有两种

一种是同步通信,也就是a--->b,直接调用对方服务,数据从a出发直接到达b

另一种叫异步通信,也就是a----->容器---->b,数据从一段出发,先进入一个容器进行临时存储,当满足条件后,由容器发给另一端,这个容器的实现就是MQ

(二)MQ的作用

MQ主要的工作是接收并发送消息,在不同场景下可以有不同的作用

这里说一些比较常见的作用

1.异步解耦

  在业务流程中,有一些操作时很耗时的,并且可能阻塞其他任务,但是这些操作并不要求立即执行,我们就可以借助MQ来把这些操作异步化

2.流量削峰

 我们在一些特殊时间可能会有访问量突增的情况,那我们应用一般来说是不能立即全部处理的,所以我们可以使用MQ把访问放到消息队列中,然后依次的处理这些请求

3.消息分发

  当多个系统要对同一个数据做出响应时,我们可以使用MQ来进行消息分发,比如支付成功后,支付系统就可以向MQ发送消息,然后其他系统来订阅这个消息,就不需要轮询数据库了

4.延迟通知

 在一些定时要进行处理的一些场景中,我们可以使用MQ的延迟消息功能,就比如在电商平台,用户下单后一定时间未支付,我们可以把限定的时间放到队列中,时间到了自动取消订单

(三)RabbitMQ的核心概念

我们这里用RabbitMQ来学习消息队列

AMQP

首先我们来看一下这个,AMQP是一种高级消息队列,定义了一套确定的消息交换功能,包含交换机和队列如何映射等,这些组件共同工作,使消息能够正常被接收和发送,AMQP还定义了一个网络协议,允许客户端和AMQP模型进行通信

我们RabbitMQ就是实现了AMQP协议的,RabbitMQ就是AMQP协议的Erlang的实现

所以他们两者的结构模型是一样的

首先我们来看一下RabbitMQ的工作流程

RabbitMQ是⼀个消息中间件,也是⼀个⽣产者消费者模型.它负责接收,存储并转发消息.

首先我们先来了解一下图中出现的一些概念

1.Producer和Consumer

Producer:生产者,是MQ的客户端,用来向RabbitMQ发送消息的

Consumer:消费者,也是MQ的客户端,是用来从RabbitMQ中取消息的

Brokeer就是RabbitMQ,用来接收和转发消息的

我们也可以简化下上面的图,只关注这三者

2.Connection和Channel

Connection:连接,是客户端和RabbitMQ服务器建立的一个TCP连接,这个连接时建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息

Channel:信道,每个建立的connection都可以由很多个信道,每个信道是一个独立的虚拟连接,消息的发送和接收都是通过channel的

注:信道的作用是为了把所有的读写操作都给复用到同一个TCP连接上,这个可以减少建立和关闭连接的开销,提高性能

3.Virtual host

Virtual host:虚拟机,这个是消息队列提供的一种逻辑上的隔离机制(本质上没有隔离),对于Rabbit一个Broker可以有多个虚拟机,当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个⽤⼾在⾃⼰的vhost创建exchange/queue等

4.queue

queue:队列,是RabbitMQ中的内部对象,是存储消息的地方

多个消费者可以订阅同一个队列

5.exchange

exchange:交换机 ,是消息到达broker中的第一站(因为虚拟机是逻辑上的本质上是不存在的),它负责接收生产者传来的消息,并根据routingkey(bingingkey)来把这些消息路由到一个或者多个queue中,exchange起到了消息路由的作用

6.RabbitMQ的工作流程

1.生产者生产消息

2.生产者与RabbitMQ建立一个连接并且开启一个信道

3.生产者声明一个交换机

4.生产者声明一个队列

5.生产者发送消息到Broker

6.Broker接收消息,并存入到相列中,如果没有相应的队列,就根据生产者的配置来进行一些处理

(四)RabbitMQ入门程序

  首先我们要使用RabbitMQ我们要在服务器上进行安装,那安装就先不说了,我们可以在官方文档或者b站找到很多资料

1.引入依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

生产者代码:

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
          //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换机(这里使用默认交换机)
        //声明队列
        /**
         * var1 队列名称
         * var2 是否持久化
         * var3 是否独占(这个队列只有一个消费者)
         * var4 是否自动删除
         * argument 一些高级参数
         * AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
         */
        channel.queueDeclare("hello",true,false,false,null);
        //发送消息
        /**
         * var1 交换机名称
         * var2 交换机通过什么映射到queue中(内置交换机routingkey与队列名称保持一致)
         * var3 属性配置
         * body 消息
         * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
         */
        String s1="hello wd";
        channel.basicPublish("","hello",null,s1.getBytes());
        //释放信道
        channel.close();
        //释放连接
        connection.close();
    }
}

消费者代码:

public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列(如果队列没有才创建,有就不创建)
        channel.queueDeclare("hello",true,false,false,null);
        //消费消息
        /**
         * String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue 队列名称
         * autoAck是否自动确认
         * callback接收消息后要执行的逻辑
         */
        DefaultConsumer consumer=new DefaultConsumer(channel){
            //从队列中获取到消息就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到: "+new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);
        //释放资源
        channel.close();
        connection.close();
    }
}

消费者代码中有一个类叫Consumer

也就是我们channel.basicConsume()中的最后一个参数

这个参数是来定义消费者行为的,当我们需要从MQ中接收消息时,我们要提供一个实现了Consumer接口的对象(这里我使用的是内部类)

DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口

核心方法就是我们上面重写的方法

那上面我们没有说参数都是什么意思,这里来说一下

consumerTag:消费者标签,通常是消费者在订阅队列时指定的

envelope:有一些封包信息,如队列名称,交换机等

properties:一些配置信息

body:消息的具体内容

(五)可能出现的错误

1.

我们上面销毁的过程中,一定要先销毁channel再销毁connection或者直接销毁connection不销毁channel

如果先销毁connection再销毁channel就会报错

2.

我们在消费者代码中,如果不自己声明队列,且MQ中没有这个队列时,我们就会报错

3.

如果端口或者ip错误,也会报错

4.账号密码不正确也会报错 

5.用户对虚拟机没有操作权限会报错

 


http://www.kler.cn/a/488856.html

相关文章:

  • 通过gradle发布aar或jar携带sources-jar到maven nexus
  • “深入浅出”系列之FFmpeg:(1)音视频开发基础
  • Python基于YOLOv8和OpenCV实现车道线和车辆检测
  • L1G5000 XTuner 微调个人小助手认知
  • 智慧公厕大数据驱动下的公共卫生管理与优化
  • 直流无刷电机控制(FOC):电流模式
  • 中国科学院|PPTAgent:超越文本到幻灯片的演示文稿生成与评估
  • 三化六防是什么
  • 现代企业架构白皮书(可以在线阅读完整PDF文件)
  • 基于 GEE Sentinel-1 数据集提取水体
  • MySQL_单行函数和多行函数
  • 【MySQL】ON与WHERE的区别(临时表)
  • [tesseract]Deserialize header failed: FIRC.lstmf
  • 【力扣热题100】—— Day20.多数元素
  • 【网络协议】动态路由协议
  • 深入探索AI核心模型:CNN、RNN、GAN与Transformer
  • 【Leetcode-移动零】利用将非零元素移动至数组前解决移动零问题(剪枝优化)
  • TypeScript语言的网络编程
  • Linux第一个系统程序---进度条
  • 详细分析 Git 分支重命名与同步操作
  • Harmony OS开发-ArkUI框架速成八之交叉轴和自适应
  • 【Qt】控件概述和QWidget核心属性1(enabled、geometry、windowTitle、windowIcon、QRC机制)
  • 全文搜索-搜索权限,非侵入文档同步,权限同步 ,扩展字段
  • 深度解析如何使用Linux中的git操作
  • Python 教程 - 基本语句
  • 【Golang 面试题】每日 3 题(二十四)