当前位置: 首页 > article >正文

RabbitMQ使用随笔

1.RabbitMQ介绍

        RabbitMQ 是一个开源的消息代理软件,采用了先进的消息队列协议 AMQP(Advanced Message Queuing Protocol)。它主要用于处理分布式系统中各个组件间的消息传递和通信。RabbitMQ 支持多种协议,但 AMQP 是其最常用和核心的协议。

主要特性:

  1. 高可用性:RabbitMQ 支持集群部署,能保证消息服务的高可用性,甚至在某些节点发生故障时,系统也能继续提供服务。
  2. 消息持久化:它支持将消息持久化到磁盘,以确保在系统崩溃时不会丢失消息。
  3. 可靠传递:RabbitMQ 确保消息可以可靠地传送到消费者,使用确认机制避免消息丢失。
  4. 消息路由:通过交换机(Exchange)来路由消息,可以根据不同的规则将消息传送到不同的队列。
  5. 多协议支持:除了 AMQP,RabbitMQ 还支持其他协议,如 MQTT、STOMP 和 HTTP 等。
  6. 灵活的消息队列模型:支持多种队列类型,如直接队列(Direct)、扇形队列(Fanout)、主题队列(Topic)和头部队列(Header)。

架构特点:

  • 生产者(Producer):负责发送消息。
  • 消费者(Consumer):负责接收和处理消息。
  • 队列(Queue):存储消息的地方,消费者从队列中获取消息。
  • 交换机(Exchange):决定如何将消息路由到队列。
  • 绑定(Binding):交换机与队列之间的连接。

2.应用场景:

  1. 异步任务处理:在分布式系统中,通过 RabbitMQ 将任务发送到不同的处理单元进行处理,从而解耦各个服务。
  2. 消息广播:通过 RabbitMQ 的扇形交换机,将消息广播给多个消费者。
  3. 流量削峰:通过将消息放入队列并按顺序处理,可以有效削减瞬时流量带来的压力。
  4. 事件驱动架构:使用 RabbitMQ 实现不同模块间的事件通知和传递。

3.消息队列模型

        3.1 简单模式

               普通队列模型,生产者消费者直接绑定到队列上(默认交换机),一个生产,一个消费。

        队列模型图:

        普通一对一示列代码:

        producter:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	if err != nil {
		log.Fatalf("failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 定义一个队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否排他性
		false,        // 是否等待
		nil,          // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 发送消息
	i := 0
	for {
		i += 1
		body := fmt.Sprintf("Welcome mq world ! #%d", i)
		err = ch.Publish(
			"",     // 默认交换机
			q.Name, // 队列名称
			true,   // 是否持久化
			false,  // 是否立即发送
			amqp.Publishing{
				ContentType:  "text/plain",
				Body:         []byte(body),
				DeliveryMode: amqp.Persistent, // 消息持久化
			})
		if err != nil {
			log.Fatalf("failed to publish a message: %v", err)
		}
		fmt.Printf("Sent: %s\n", body)
		time.Sleep(1 * time.Second) // 每秒发送一个消息
	}

	// 关闭连接
	fmt.Println("All messages sent.")
}

        comsumer:

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	if err != nil {
		log.Fatalf("failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 定义一个队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否排他性
		false,        // 是否等待
		nil,          // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 创建一个消费者
	msgs, err := ch.Consume(
		q.Name,     // 队列名称
		"EricTest", // 消费者名称
		//true,       // 是否自动确认,只要消费这节收到,就标记其可以删除(适用于消费者能高效的处理消息场景)
		false, // 采用手动确认,自定义消息应答模式,做到收到消息且消息真正处理完了,再告知消息可以删除了。(还支持批量应答)
		false, // 是否排他性
		false, // 是否阻塞
		false, // 是否等待
		nil,   // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to register a consumer: %v", err)
	}

	// 设置消息处理策略
	err = ch.Qos(1, 0, false)
	if err != nil {
		log.Fatalf("failed to set QoS: %v", err)
	}

	// 使用 goroutine 处理消息
	go func() {
		for msg := range msgs {
			fmt.Printf("Received a message: %s\n", msg.Body)
			// 处理消息
			// 手动应答
			// 手动确认,设置为false,表示不重新排队,如果处理失败情况,可以通过msg.Nack来拒绝并重新入队
			/*
				msg.Nack
				消息处理失败,重新入队
				if err := msg.Nack(false,true);err != nil {
				}
			*/
			if err := msg.Ack(false); err != nil {
				log.Fatalf("failed to acknowledge message: %v", err)
			} else {
				fmt.Printf("Acknowledged message: %s\n", msg.Body)
			}
		}
	}()

	// 阻塞主线程,保持服务端一直运行
	fmt.Println("Waiting for messages. To exit press CTRL+C")
	select {}
}

还可以通过一个P和多个C,多个消费者从同一个队列中获取消息进行消费,默认情况是平均分配的消费,不会因为那个消费块就让其多消费,模拟两个消费 ,观察现象(C1消费快,C2消费慢),如上代码,启动一个P和两个C模拟观察情况,模拟输出如下:

同一份消费代码:

 C1比C2慢速率慢2秒:(上述代码消费端加个延时即可)

        3.2 队列绑定交换机模式(以及几种交换机特点)

       图解模型:

该模型的特点是,生产者的消息不再直接发往队列,而是发送到交换机,再由交换机根据规则将消息传送到队列汇总。这里解释下交换机概念?

        队列交换机:

        定义的一个消息组建,其目的是负责接收生产者发送的消息,并根据一定的规则将消息路由到一个或者多个队列中。模型如上图所示,他主要是根据路由键和绑定规则来确定消息的去向。在rabbitmq中,提供了Direct(直接交换机),Fanout(广播交换机),Topic(主题交换机),Headers(头交换机)四种类型,每种类型机制不一,具体如下:

        direct:

                a.消息会被路由到与指定路由键完全匹配的队列。

                b.如果交换机的路由键与队列的绑定路由键匹配,消息将被发送到该队列

       定向direct,一个交换机,两个队列,不同的路由key匹配,模拟输出,两个队列匹配不同的路由key,交换机每次根据具体的路由key将消息推送到不同的队列中

        fanout:

                a.消息会被路由到所有与该交换机绑定的队列,忽略虑路由键

                b.这种类型的交换机类似广播,所有绑定的队列都会接收到消息

        广播fanout,通过交换节的消息向所有绑定的队列都推送一份消息,模拟输出:

        topic:

                a.路由键是一个由多个部分组成的字符串.(person.rabbit.white)

                b.交换机会根据绑定时使用的模式(通配符 * 和 #)来选择匹配的队列。

                c.* 用来匹配单个词,# 用来匹配零个或多个词        

        Topic通过模式匹配转发消息且消费:

        headers:         

                a.消息根据其头部信息(而非路由键)来进行路由

                b.交换机会根据队列与交换机之间的绑定条件(头部的键值对)来匹配消息

        headers模式模拟代码:

        代码部分片段:p&c

        producter:

// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"headers_exchange", // 交换机名称
		"headers",          // 交换机类型,主题方式
		false,              // 是否持久化
		false,              // 是否自动删除
		false,              // 是否内部使用
		false,              // 是否等待确认
		nil,                // 额外参数
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 发布消息到交换机,指定路由键
	//routerKey := "info_eric"
	// 发送消息
	i := 0
	for {
		i += 1
		body := fmt.Sprintf("Welcome mq world ! #%d", i)
		err = ch.Publish(
			"headers_exchange", //交换机名称
			//q.Name,          // 队列名称
			"",    // 路由键 headers模式下不使用
			false, // 是否持久化
			false, // 是否立即发送
			amqp.Publishing{
				ContentType:  "text/plain",
				Body:         []byte(body),
				DeliveryMode: amqp.Transient, // 消息持久化
				Headers: amqp.Table{
					"X-Message-Type": "" +
						"greeting", // 设置消息头
				},
			})
		if err != nil {
			log.Fatalf("failed to publish a message: %v", err)
		}
		fmt.Printf("Sent: %s\n", body)
		time.Sleep(1 * time.Second) // 每秒发送一个消息
	}

  comsumer:

        

// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个队列
	q, err := ch.QueueDeclare(
		"headers_queue", // 队列名称
		false,           // 是否持久化
		false,           // 是否自动删除
		false,           // 是否排他性
		false,           // 是否等待
		nil,             // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"headers_exchange", // 交换机名称
		"headers",          // 交换机类型,广播
		false,              // 是否持久化
		false,              // 是否自动删除
		false,              // 是否内部使用
		false,              // 是否等待确认
		nil,                // 额外参数
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,             // 队列名称
		"",                 // 路由键,头部消息路由,该字段不使用
		"headers_exchange", // 交换机名称
		false,              // 是否等待确认
		nil,                // 额外参数
	)

	// 创建一个消费者
	msgs, err := ch.Consume(
		q.Name,          // 队列名称
		"comsumer_eric", // 消费者名称
		//true,       // 是否自动确认,只要消费这节收到,就标记其可以删除(适用于消费者能高效的处理消息场景)
		false, // 采用手动确认,自定义消息应答模式,做到收到消息且消息真正处理完了,再告知消息可以删除了。(还支持批量应答)
		false, // 是否排他性
		false, // 是否阻塞
		false, // 是否等待
		nil,   // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to register a consumer: %v", err)
	}

	// 设置消息处理策略
	err = ch.Qos(1, 0, false)
	if err != nil {
		log.Fatalf("failed to set QoS: %v", err)
	}

	// 使用 goroutine 处理消息
	go func() {
		for msg := range msgs {
			fmt.Printf("Received a message: %s %s\n", msg.Body, msg.Headers)
			// 处理消息
			// 手动应答
			// 手动确认,设置为false,表示不重新排队,如果处理失败情况,可以通过msg.Nack来拒绝并重新入队
			/*
				msg.Nack
				消息处理失败,重新入队
				if err := msg.Nack(false,true);err != nil {
				}
			*/
			if err := msg.Ack(false); err != nil {
				log.Fatalf("failed to acknowledge message: %v", err)
			} else {
				fmt.Printf("Acknowledged message: %s\n", msg.Body)
			}
		}
	}()

模拟输出:

    综上,交换机的功能将消息路由到队列,且自身不存储消息。另外,不同的类型的交换根据不同的路由策略将消息分发到合适的队列中去。

        以上几种情况示例代码如下可以灵活修改模拟实现上述规则,另外还列举出了定向direct方式下只想某种匹配的路由key下发消息情况,请见如下示例:

       producter:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	if err != nil {
		log.Fatalf("failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 定义一个交换机
	err = ch.ExchangeDeclare(
		"exchange_eric", // 交换机名称
		"direct",        // 交换机类型
		true,            // 是否持久化
		false,           // 是否自动删除
		false,           // 是否内部使用
		false,           // 是否等待确认
		nil,             // 额外参数
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 发布消息到交换机,指定路由键
	routerKey := "info_eric"

	// 发送消息
	i := 0
	for {
		i += 1
		body := fmt.Sprintf("Welcome mq world ! #%d", i)
		err = ch.Publish(
			"exchange_eric", //交换机名称
			routerKey, // 指定路由键
			true,      // 是否持久化
			false,     // 是否立即发送
			amqp.Publishing{
				ContentType:  "text/plain",
				Body:         []byte(body),
				DeliveryMode: amqp.Persistent, // 消息持久化
			})
		if err != nil {
			log.Fatalf("failed to publish a message: %v", err)
		}
		fmt.Printf("Sent: %s\n", body)
		time.Sleep(1 * time.Second) // 每秒发送一个消息
	}

	// 关闭连接
	fmt.Println("All messages sent.")
}

       comsumer: (当前消费者绑定与生产者匹配的路由键info_eric)

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	if err != nil {
		log.Fatalf("failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 定义一个队列
	q, err := ch.QueueDeclare(
		"task_queue_zero", // 队列名称
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否排他性
		false,        // 是否等待
		nil,          // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 定义一个交换机
	err = ch.ExchangeDeclare(
		"exchange_eric", // 交换机名称
		"direct",        // 交换机类型
		true,            // 是否持久化
		false,           // 是否自动删除
		false,           // 是否内部使用
		false,           // 是否等待确认
		nil,             // 额外参数
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,          // 队列名称
		"info_eric",     // 路由键,绑定是制定具体路由键,生产者发送消息时,指定路由键,将消息按照对应路由键下发到队列当中
		"exchange_eric", // 交换机名称
		false,           // 是否等待确认
		nil,             // 额外参数
	)

	// 创建一个消费者
	msgs, err := ch.Consume(
		q.Name,          // 队列名称
		"comsumer_eric", // 消费者名称
		//true,       // 是否自动确认,只要消费这节收到,就标记其可以删除(适用于消费者能高效的处理消息场景)
		false, // 采用手动确认,自定义消息应答模式,做到收到消息且消息真正处理完了,再告知消息可以删除了。(还支持批量应答)
		false, // 是否排他性
		false, // 是否阻塞
		false, // 是否等待
		nil,   // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to register a consumer: %v", err)
	}

	// 设置消息处理策略
	err = ch.Qos(1, 0, false)
	if err != nil {
		log.Fatalf("failed to set QoS: %v", err)
	}

	// 使用 goroutine 处理消息
	go func() {
		for msg := range msgs {
			fmt.Printf("Received a message: %s\n", msg.Body)
			// 处理消息
			// 手动应答
			// 手动确认,设置为false,表示不重新排队,如果处理失败情况,可以通过msg.Nack来拒绝并重新入队
			/*
				msg.Nack
				消息处理失败,重新入队
				if err := msg.Nack(false,true);err != nil {
				}
			*/
			if err := msg.Ack(false); err != nil {
				log.Fatalf("failed to acknowledge message: %v", err)
			} else {
				fmt.Printf("Acknowledged message: %s\n", msg.Body)
			}
		}
	}()

	// 阻塞主线程,保持服务端一直运行
	fmt.Println("Waiting for messages. To exit press CTRL+C")
	select {}
}

另外队列设定其他路由键,虽然是同一个交换机,但是交换机派发消息按照路由键匹配规则下发:

	// 定义一个队列
	q, err := ch.QueueDeclare(
		"task_queue_one", // 队列名称
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否排他性
		false,        // 是否等待
		nil,          // 额外属性
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}
    // 定义一个交换机
	err = ch.ExchangeDeclare(
		"exchange_eric", // 交换机名称
		"direct",        // 交换机类型
		true,            // 是否持久化
		false,           // 是否自动删除
		false,           // 是否内部使用
		false,           // 是否等待确认
		nil,             // 额外参数
	)
	if err != nil {
		log.Fatalf("failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,          // 队列名称
		"info_jean",     // 路由键
		"exchange_eric", // 交换机名称
		false,           // 是否等待确认
		nil,             // 额外参数
	)

 上述代码中路由键跟consumer不一致,所以当前的队列task_queue_one不会接收到producter推送的消息,模拟输出如下:

上图所示,路由键匹配的队列能接受到消息,不匹配的因无消息消费阻塞等待。 

producter代码优化,设置两种路由key随机的分发消息到交换机,交换机根绝每个队列匹配的路由key将消息分发到不同的队列中,实际业务中可利用不同的路由key将消息转发。

修改后的producter代码,注意routerKeys的相关部分:

// 发布消息到交换机,指定路由键
	routerKeys := []string{"info_jean", "info_eric"}

	// 发送消息
	i := 0
	for {
		i += 1
		body := fmt.Sprintf("Welcome mq world ! #%d", i)

		routerKeyIndex := rand.Intn(2)
		err = ch.Publish(
			"exchange_eric", //交换机名称
			routerKeys[routerKeyIndex], // 指定路由键
			true,      // 是否持久化
			false,     // 是否立即发送
			amqp.Publishing{
				ContentType:  "text/plain",
				Body:         []byte(body),
				DeliveryMode: amqp.Persistent, // 消息持久化
			})
		if err != nil {
			log.Fatalf("failed to publish a message: %v", err)
		}
		fmt.Printf("Sent: %s\n", body)
		time.Sleep(1 * time.Second) // 每秒发送一个消息
	}

模拟实现输出:

4.常见问题模拟与解决方式

        4.1:消息积压与优化方案

                消息积压,通常造成原因有如下几种情景:

                1.生产者比消费者量大,消费端进度慢缘故造成。

                2.因为消费端出现故障,导致消息无法被消费,生产者持续生成。

                3.消费端和队列之间网络延迟造成消息不能及时被消费掉。

                4.消费端采用手动应答,联合一些业务层面的处理逻辑从而导致消息没及时被消费应答清理。

                优化方案:

                1.加快消费端消费效率。采用多个消费端来处理消息或者并发处理。

                2.合理配置队列跟交换机,分布式队列,多个队列负载分散,通过交换机将消息路由到不同的队列,从而避免某个队列消息积压。另外根据业务合理设置队列合理绑定规则,避免队列之间消息传递。

                3.设置合理的消息优先级和过期时间(TTL)。

                4.限流方式,用QOS限制消费者消费速率,防止消费者一开始接收过多的消息而积压。

                5.反向限制生产者速率,结合实际情况合理设置。

                6.并发场景,采用批量确认处理,较少网络延时和消息确认带来的性能消耗。                


http://www.kler.cn/a/512863.html

相关文章:

  • 目标跟踪算法发展简史
  • TMC2208替代A4988
  • iOS中的设计模式(三)- 工厂方法
  • 51c自动驾驶~合集48
  • 网络安全:信息时代的守护者
  • JSON-stringify和parse
  • C语言勘破之路-最终篇 —— 预处理(上)
  • 高质量编程 性能优化学习笔记
  • redis-redission的加锁源码与看门狗机制
  • 【人工智能数学基础—微积分】深入详解梯度与梯度下降:掌握梯度下降法及其变种及模型参数的优化
  • 14天学习微服务-->第1天:微服务架构入门
  • Java锁 死锁及排查 JVM 工具 jconsole 工具 排查死锁
  • R语言的编程范式
  • cuda从零开始手搓PB神经网络
  • QT:多窗口设计(主窗口点击按钮打开子窗口)
  • 开源的Text-to-SQL工具WrenAI
  • SQL Server2022版详细安装教程(Windows)
  • 有线通信方式(I2C、UART、RS485)
  • 【Red Hat8】:搭建FTP服务器
  • springboot接入deepseek深度求索 java
  • vue3使用音频audio标签
  • 可视化平台建设技术方案,商业BI系统解决方案,大屏建设功能需求分析(word原件)
  • Datawhale组队学习笔记task2——leetcode面试题
  • 前〈和平精英〉技术策划进军AI游戏领域,获新投资
  • 【数据结构】搜索二叉树
  • 【有啥问啥】什么是端到端(End-to-End)?