当前位置: 首页 > article >正文

消息队列篇--通信协议篇--AMOP(交换机,队列绑定,消息确认,AMOP实现实例,AMOP报文,帧,AMOP消息传递模式等)

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种开放的、跨平台的消息传递协议,旨在提供一种标准化的方式在不同的消息代理和客户端之间进行消息传递。AMQP不仅定义了消息格式和路由机制,还规定了如何建立连接、发送和接收消息等操作。它适用于多种编程语言和平台,并且支持复杂的路由机制。

特点:

  • 支持复杂的路由规则和消息传递模式。
  • 提供多种消息确认机制(如ACK)。
  • 支持消息持久化和高可用性。
  • 支持多语言客户端库。
  • 适用于企业级应用中的异步通信。

1、AMQP的核心概念

(1)、消息(Message)

消息是AMQP中传输的基本单位,通常包含两个部分:

  • 消息头(Header):包含一些元数据,如消息ID、优先级、时间戳等。
  • 消息体(Body):实际要传递的数据内容,可以是文本、字节流等形式。

(2)、交换机(Exchange)

交换机(Exchange)是AMQP中的一个关键组件,是消息进入消息队列的入口点,负责接收消息并将它们路由到一个或多个队列中。交换机根据消息的路由键(Routing Key)和绑定规则(Binding Rules)决定将消息发送到哪些队列。

常见的交换机类型包括:

  • Direct Exchange:根据精确匹配的路由键将消息发送到相应的队列。
  • Fanout Exchange:将消息广播到所有绑定的队列,不考虑路由键。
  • Topic Exchange:根据模式匹配的路由键将消息发送到相应的队列。
  • Headers Exchange:根据消息头中的属性进行路由。

(3)、队列(Queue)

队列是存储消息的地方,消费者从队列中获取消息并进行处理。每个队列都有一个唯一的名称,并且可以有多个消费者同时监听同一个队列。

(4)、绑定(Binding)

绑定是指将交换机与队列关联起来的过程。它决定了哪些消息应该被路由到哪些队列。绑定时需要指定一个路由键(Routing Key),用于确定哪些消息应该被发送到该队列。

(5)、连接(Connection)

连接是客户端与消息代理(Broker)之间的物理网络连接。通常使用TCP协议建立连接。

(6)、通道(Channel)

通道是客户端与消息代理之间的通信路径。通过通道,客户端可以在同一连接上执行多个操作(如发送和接收消息)。通道的好处是可以复用连接,减少资源开销。

(7)、确认(Acknowledgment)

确认机制确保消息已被成功处理。当消费者接收到消息后,可以选择手动或自动发送确认回执给消息代理。如果消息未被确认,消息代理会认为消息未被处理,并可能重新投递该消息。

(8)、生产者(Producer)

生产者是发送消息的应用程序或服务。它们将消息发送到交换器。

(9)、消费者(Consumer)

消费者是从队列中接收消息的应用程序或服务。它们负责处理消息内容。

2、AMQP的工作流程

原理示意图:
在这里插入图片描述

AMQP的典型工作流程包括以下几个步骤:
(1)、建立连接:客户端与消息代理(如:RabbitMQ)建立TCP连接。
(2)、创建通道:在连接上创建一个或多个通道,用于执行消息传递操作。
(3)、声明交换机和队列:声明交换机和队列,并设置相关的属性(如持久性、自动删除等)。
(4)、绑定交换机和队列:将交换机与队列绑定,并指定路由键。
(5)、发送消息:生产者通过交换机发送消息到队列。
(6)、接收消息:消费者从队列中接收消息并进行处理。
(7)、确认消息:消费者发送确认回执给消息代理,表示消息已被成功处理。

3、AMQP的优势

(1)、跨平台兼容性
AMQP是一种开放标准,允许不同的消息代理实现相互兼容。这意味着你可以选择最适合你需求的消息代理,而不必担心供应商锁定问题。
(2)、可靠性
AMQP提供了多种机制来确保消息传递的可靠性,包括持久化消息、事务支持和确认机制。
(3)、灵活性
AMQP支持多种交换器类型和绑定规则,使得它可以灵活应对各种复杂的路由需求。
(4)、安全性
AMQP支持SSL/TLS加密,确保消息在网络传输过程中的安全性。

4、常见的AMQP实现

(1)、RabbitMQ
RabbitMQ是最流行的AMQP实现之一,提供了丰富的功能和良好的社区支持。它不仅支持AMQP,还支持其他协议如MQTT、STOMP等。
(2)、Qpid
Qpid是Apache基金会的一个项目,提供了AMQP的完整实现。它包括两个主要组件:Qpid Broker和Qpid Client。
(3)、ActiveMQ
ActiveMQ是另一个流行的消息代理,虽然它的默认协议是OpenWire,但它也支持AMQP。

5、AMQP的消息传递模型

AMQP支持多种消息传递模型,具体取决于使用的交换机类型。

(1)、点对点(P2P)模型

在P2P模型中,每条消息只能由一个消费者处理。消息被发送到一个队列,然后由某个消费者从队列中获取并处理。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明队列
channel.queue_declare(queue='task_queue', durable=True)

// 发送消息
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body='Hello, AMQP!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,   使消息持久化
                      ))

print(" [x] Sent 'Hello, AMQP!'")

// 关闭连接
connection.close()

(2)、发布/订阅(Pub/Sub)模型

在Pub/Sub模型中,消息可以被多个订阅者接收。消息被发送到一个交换机,然后广播到所有绑定的队列。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

// 发送消息
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body='Hello, AMQP!')
print(" [x] Sent 'Hello, AMQP!'")

// 关闭连接
connection.close()

(3)、主题(Topic)模型

在Topic模型中,消息根据路由键的模式匹配被发送到相应的队列。路由键可以包含通配符(表示一个单词, 表示零个或多个单词)。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

// 绑定队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = ['.orange.', 'lazy.']
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

// 接收消息
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()} on {method.routing_key}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print(' [] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

适用场景:

  • 企业级应用中的异步通信。
  • 微服务架构中的消息传递。
  • 需要复杂路由规则和高可靠性保障的场景。

6、代码示例:(RabbitMQ示例)

(1)、添加依赖

<dependencies>
    <!-- Spring Boot Starter for AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- Spring Boot Starter Web (Optional, if you need a REST controller) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- RabbitMQ Java Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
    </dependency>
</dependencies>

(2)、配置文件(yaml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

(3)、创建配置类

创建一个配置类来定义交换器、队列和绑定关系。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 定义队列名称
    public static final String QUEUE_NAME = "exampleQueue";
    public static final String EXCHANGE_NAME = "exampleExchange";
    public static final String ROUTING_KEY = "example.routing.key";

    // 声明队列
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); // true 表示持久化队列
    }

    // 声明交换器
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    // 绑定队列到交换器
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

(4)、创建消息生产者

创建一个服务类来发送消息。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
    }
}

(5)、创建消息消费者

创建一个监听器类来接收消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

7、AMOP报文

AMQP(Advanced Message Queuing Protocol)是一种二进制协议,用于在客户端和消息代理之间传递消息。与HTTP这种基于文本的协议不同,AMQP的消息是通过二进制格式进行编码的,因此直接查看原始报文并不直观。然而,我们可以从概念上理解AMQP报文的结构,并通过一些工具或库来解析和展示这些报文的内容。

(1)、AMQP报文结构

AMQP报文通常由多个帧组成,每个帧都包含一个帧头和有效载荷部分。
具体来说,在发送一条消息的过程中,常见的报文结构确实包括三个主要的帧:方法帧(Method Frame)、内容头帧(Content Header Frame)和内容体帧(Content Body Frame)。每个帧都需要在其前面拼接一个帧头。

1、帧头(Frame Header)

每个AMQP帧都以一个帧头开始,包含帧类型、通道编号和帧长度等信息。

  • 帧类型(Frame Type):标识帧的类型(如方法帧、内容头帧、内容体帧等)。
  • 通道编号(Channel Number):标识该帧所属的通道。
  • 帧长度(Frame Length):帧的有效载荷部分的长度(不包括帧头本身)。
2、帧的类型
(1)、方法帧(Method Frame)

用于表示特定的操作,如连接、打开通道、声明队列等。

(2)、内容头帧(Content Header Frame)

包含消息的元数据,如消息大小、属性等。

(3)、内容体帧(Content Body Frame)

包含实际的消息内容。

(2)、示例:完整的AMQP报文

假设我们要发送一条JSON消息到exampleQueue的队列中,以下是完整的AMQP报文结构及其二进制表示。

1、声明队列的方法帧

示例:

| Frame Type: 1       |  // 方法帧
| Channel: 1          |
| Payload Length: 17  |
| Class ID: 50        |  // Queue类
| Method ID: 10       |  // Declare方法
| Queue Name Length: 13 |
| Queue Name: "exampleQueue" |
| Durable: true       |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为方法帧的具体内容(队列相关的信息,包含队列的类id,方法id,名称,长度等)。

二进制表示:

01 00 01 00 00 00 11  // 帧头
00 32 00 0A           // Class ID 和 Method ID
00 00 00 0D           // 队列名称长度
65 78 61 6D 70 6C 65 51 75 65 75 65  // 队列名称 "exampleQueue"
01                    // Durable: true
2、发送消息的内容头帧

示例:

| Frame Type: 2       |  // 内容头帧
| Channel: 1          |
| Payload Length: 22  |
| Class ID: 60        |  // Basic 类
| Weight: 0           |
| Body Size: 24 bytes |
| Properties:         |
|   Content-Type: application/json |
|   Delivery-Mode: 2 (persistent) |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容头帧的具体内容。

二进制表示:

02 00 01 00 00 00 16  // 帧头
00 3C 00 00           // Class ID 和 Weight
00 00 00 18           // Body Size: 24 bytes
00 00 00 0E 61 70 70 6C 69 63 61 74 69 6F 6E 2F 6A 73 6F 6E  // Content-Type: application/json
02                    // Delivery-Mode: persistent
3、发送消息的内容体帧

示例:

| Frame Type: 3       |  // 内容体帧
| Channel: 1          |
| Payload Length: 24  |
| {"name": "John", "age": 30} |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容体帧的具体内容。

二进制表示:
03 00 01 00 00 00 18 // 帧头
7B 22 6E 61 6D 65 22 3A 20 22 4A 6F 68 6E 22 2C 20 22 61 67 65 22 3A 20 33 30 7D // JSON 消息

(3)、AMQP报文总结

在AMQP协议中,每种类型的帧(方法帧、内容头帧、内容体帧)都需要在其前面拼接一个帧头。帧头提供了关于帧的基本信息,使得接收方能够正确解析和处理这些帧。
具体步骤如下:
1、方法帧:用于执行操作(如声明队列)。
2、内容头帧:包含消息的元数据(如消息大小和属性)。
3、内容体帧:包含实际的消息内容。

AMQP本质是一种二进制协议,消息内容是无法通过文本查看的。理解AMQP报文能够让我们更好的了解其本质和原理

8、总结

AMQP是一种强大的消息传递协议,广泛应用于分布式系统中,确保不同组件之间的可靠通信。通过理解其核心概念和工作流程,开发者可以更好地利用AMQP构建高效、可扩展的应用程序。无论是微服务架构、事件驱动系统还是日志收集,AMQP都能提供坚实的通信基础。

乘风破浪!Dare to Be!!!


http://www.kler.cn/a/519101.html

相关文章:

  • Tailwind CSS—骨架屏生成器
  • LGBMRegressor CatBoostRegressor XGBRegressor回归
  • 有限元分析学习——Anasys Workbanch第一阶段_终篇_齿轮整体强度案例分析
  • 蓝桥杯3518 三国游戏 | 排序
  • C++实现有限元计算 矩阵装配Assembly类
  • Python+OpenCV(1)---傅里叶变换
  • bash: ./xxx: No such file or directory
  • Cesium特效——城市白模的科技动效的各种效果
  • http和ws的区别
  • 【设计模式-行为型】调停者模式
  • libOnvif通过组播不能发现相机
  • 51单片机入门_01_单片机(MCU)概述(使用STC89C52芯片)
  • SpringBoot3+Vue3开发学生选课管理系统
  • CSS 中的 id 和 class 选择器
  • ARM嵌入式学习--第九天(串口通信)
  • 二十三种设计模式-享元模式
  • minikube源码学习
  • 【自然语言处理(NLP)】jieba分词的使用(分词模式、关键词提取)
  • 【BQ3568HM开发板】深入解析智能家居中控屏工程的NAPI接口设计
  • 视觉语言模型 (VLMs):跨模态智能的探索