当前位置: 首页 > article >正文

RabbitMQ笔记

一、MQ与RabbitMQ概述


1. MQ简述


MQ(Message Queue)消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。

一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)

在这里插入图片描述

分布式系统有两种通信方式:直接远程调用 和 借助第三方(MQ)完成间接通信。(发送方称为生产者,接收方称为消费者)


2. MQ的优势与劣势


2.1 MQ的优势


MQ的优势:(应用解耦、异步、削峰)

  • 应用解耦:提高系统容错性和可维护性;
  • 异步提速:提升用户体验和系统吞吐量;
  • 削峰填谷:提高系统稳定性。

1、应用解耦

在这里插入图片描述
在这里插入图片描述


2、异步提速

在这里插入图片描述

在这里插入图片描述


3、削峰填谷

在这里插入图片描述

填谷:

在这里插入图片描述

使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。


2.2 MQ的劣势


引入MQ会遇到下列问题:

  • 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
  • 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
  • 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
  • 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)

3. 常见的MQ产品


市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用Rdis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特征来综合考虑。

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言Erlang(二郎神,高并发语言)JavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议,社区封装了http协议支持
可用性一般
单机吞吐量一般(万级)高(十万级)非常高(十万级)
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
功能特性并发能力强,性能极其好,延迟低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ功能比较完备,扩展性佳只支持主要的MQ功能,毕竟是为大数据领域准备的。
  • 追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;

  • 追求可靠性:RabbitMQ、RocketMQ;

  • 追求吞吐能力:RocketMQ、Kafka;

  • 追求消息低延迟:RabbitMQ、Kafka。


4. RabbitMQ简述


RabbitMQ官网地址:http://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。

AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)

RabbitMQ中的一些角色:(AMQP协议消息中间件类似)

  • publisher:生产者;
  • consumer:消费者;
  • exchange :交换机,负责消息路由;
  • queue:队列,存储消息;
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离。

在这里插入图片描述

RabbitMQ工作模式:

RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第六种RPC远程调用不属于mq)

在这里插入图片描述

  • 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

JMS(JavaMessage Service)

  • JMS,Java消息服务应用程序接口,即Java操作消息中间件的API;
  • JMS是JavaEE规范的一种,类比JDBC;
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有。

二、RabbitMQ安装与配置


1. 基于docker快速安装RabbitMQ


扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml

1、拉取镜像

docker pull rabbitmq:3.8-management

2、运行容器

 docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v /docker/rabbitmq/plugins:/plugins \
 --name rabbitmq  \
 --hostname my-rabbit \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management
  • \ 代表换行
  • -e 指定环境变量
  • -e RABBITMQ_DEFAULT_USER=admin用户名
  • -e RABBITMQ_DEFAULT_PASS=123456密码
  • -v 挂载目录或文件 (数据卷)
  • -p 15672:15672 用于web管理页面使用的端口 (管理员页面)
  • -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里使用)
  • -d 后台运行
  • –name rabbitmq 容器名字
  • –hostname my-rabbit(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

3、启动xxx插件

# 进入容器
docker exec -it rabbitmq /bin/bash

# 启动xxx插件
rabbitmq-plugins enable xxx

RabbitMQ管理端:

管理端访问地址:http://ip:15672/

在这里插入图片描述
在这里插入图片描述


2. 创建用户和虚拟机


1、添加一个新用户:

在这里插入图片描述

添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:

在这里插入图片描述


2、创建虚拟机

在这里插入图片描述

为指定用户授权:

在这里插入图片描述

最后该用户就可以操作这个虚拟机了:

在这里插入图片描述


三、RabbitMQ快速入门


使用简单模型中的基本模式完成消息传递:

在这里插入图片描述

官方的HelloWorld示例是基于简单消息队列模型来实现的,其中包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue;
  • queue:消息队列,负责接受并缓存消息;
  • consumer:订阅队列,处理队列中的消息。

1. 基础环境搭建


1、创建父工程mq-demo,并在pom文件中导入如下依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--SpringAMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、创建子模块publisher、consumer,并编写启动类和yml配置文件:

# 日志输出格式配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

在这里插入图片描述


2. publisher消息发布者实现


消息收发流程:Connection连接、Channel通道、queue队列、exchange 交换机。

publisher消息发布者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

1、编写publisher测试代码:

package com.baidou.mq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

在这里插入图片描述


2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)

在这里插入图片描述

查看连接信息:

在这里插入图片描述


继续按F8,查看通道信息:

在这里插入图片描述

在这里插入图片描述


继续按F8,查看队列信息:

在这里插入图片描述
在这里插入图片描述


最后直接放行程序,查看队列中的消息:

在这里插入图片描述


3. consumer消费者实现


consumer消费者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

1、编写消费者代码

package com.baidou.mq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

在这里插入图片描述

2、测试(启动程序后会一直执行,不用的时候将程序结束即可)

在这里插入图片描述
在这里插入图片描述


四、SpringAMQP与RabbitMQ工作模型


1. SpringAMQP概述


AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址:https://spring.io/projects/spring-amqp

在这里插入图片描述

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。

RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。

在这里插入图片描述


2. BasicQueue 简单队列模型


使用SpringAMQP实现基础消息队列功能:

1、在父工程中引入spring-amqp起步依赖:

<!--SpringAMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、消息发送

2.1、在publisher服务的application.yml中添加rabbitmq配置:

# 配置rabbitmq
spring:
  rabbitmq:
    host: 192.168.200.128 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 配置日志格式
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.baidou.mq.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 使用SpringAMQP实现简单队列模型的消息发送
 *
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    // 操作RabbitMQ的模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试简单队列模型
     */
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

3、消息接收

3.1、在consumer服务的application.yml中添加rabbitmq配置:

# 配置rabbitmq
spring:
  rabbitmq:
    host: 192.168.200.128 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 配置日志格式
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

3.2、在consumer服务的com.baidou.mq.listener包中创建SpringRabbitListener类:

package com.baidou.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /**
     * 订阅消息
     *
     * @param msg
     * @throws InterruptedException
     */
    @RabbitListener(queues = "simple.queue") // 指定监听的队列名称为simple.queue
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

4、测试

先启动consumer服务(启动类),然后在publisher服务中运行测试代码,发送MQ消息。

在这里插入图片描述


3. WorkQueue 工作队列模型



http://www.kler.cn/a/15780.html

相关文章:

  • GoogleCloud服务器的SSH连接配置
  • redis和mongodb等对比分析
  • xcode-select: error: tool ‘xcodebuild‘ requires Xcode, but active developer
  • Vulhub漏洞复现---solr---CVE-2019-17558
  • 如何在 Ubuntu 上安装 Jellyfin 媒体服务器
  • cmake生成器表达式
  • Mysql 日志
  • 【教程分享】一键部署MySQL,轻松搞定Docker安装和配置!
  • 2023有潜力的新药都有哪些?新药筛选方法总结
  • VP记录:Codeforces Round 867 (Div. 3) A~G2
  • GeoTools实战指南: 一步步教您轻松实现Shapefile矢量图层可视化
  • 使用Git和GitHub完成团队项目开发
  • 【python基础语法三】列表,元组,集合,字典相关操作
  • 数据可视化大屏的页面布局以及自适应
  • CVE-2023-28432 MiniO信息泄露漏洞复现
  • 哈夫曼树【北邮机试】
  • LCD、OLED、Mini LED、Micro LED和Smart MNT
  • 【备战蓝桥杯国赛-国赛真题】2022
  • LDAP概念和原理介绍
  • Java入门教程||Java 封装||Java 接口
  • Linux Ansible-Jinjia2模板
  • iOS 吸顶效果
  • 【Redis】Redis中的5种基础数据类型详解
  • 前端web3入门脚本二:初探dex,在dex完成一笔swap
  • 势能回馈系统超级电容储能系统介绍
  • 浅谈文心一言