GoLong的学习之路,进阶,RabbitMQ (消息队列)
快有一周没有写博客了。前面几天正在做项目。正好,项目中需要MQ(消息队列),这里我就补充一下我对mq的理解。其实在学习java中的时候,自己也仿照RabbitMQ自己实现了一个单机的mq,但是mq其中一个特点也就是,分布式我在项目中没有涉及。这里我用go语言将RabbitMQ的操作进行一次整理
文章目录
- MQ概念
- 操作RabbitMQ
- 安装
- 连接
- 生产者
- 消费者
- 例子
- 生成者
- 消费者
- 注意常见的问题:
- 匹配规则
MQ概念
MQ是消息队列(Message Queue)的缩写,是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。
市面上有许多成熟的消息队列非常多,例如:
RabbitMQ
(⾮常知名, 功能强⼤, ⼴泛使⽤的消息队列)(今天要说明的)Kafka
(高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,大量用于大数据领域)RocketMQ
(具有高性能、高可靠、高实时、分布式特点)ActiveMQ
(高可用、高性能、可伸缩的企业级面向消息服务的系统)
而RabbitMQ为什么叫rabbit,其实这就不得不说一个时代,在16世纪左右有那么个事件(🐏吃人事件)简单而言就是🐏比人贵。而为了让羊有更多的食物。就需要和人以及动物(rabbit)抢地盘,但是兔子一生就是一窝,兔子行动非常迅速而且繁殖起来也非常疯狂,所以就把Rabbit用作这个分布式软件的命名(就是真么简单)。
它能做什么?
- 可靠性:RabbitMQ 提供了各种功能,让你权衡性能与可靠性,其中包括持久性,交付确认和高可用性。
- 灵活的路由:消息在到达队列之前,通过交换机的路由。RabbitMQ 为典型的路由逻辑提供了几个内置的交换机类型。对于更复杂的路由,则可以绑定几种交换机一起使用甚至可以自己实现交换机类型,并且把它作为一个插件的来使用。
- 集群:在本地网络上的几个 RabbitMQ 服务器可以聚集在一起,作为一个独立的逻辑代理来使用。
- 联合:对于服务器来说,它比集群需要更多的松散和非可靠链接。为此 RabbitMQ 提供了联合模型。
- 高度可用队列:在群集中,队列可以被镜像到几个机器中,确保您的消息即使在出现硬件故障的安全。
- 多协议:RabbitMQ 支持上各种消息传递协议的消息传送.
- 许多客户端:有你能想到的几乎任何语言 RabbitMQ 客户端。
- 管理用户界面:RabbitMQ 附带一个简单使用管理用户界面,允许您监视和控制您的消息代理的各个方面。
- 追踪:如果您的消息系统行为异常,RabbitMQ 提供跟踪支持,让你找出问题是什么。
- 插件系统:RabbitMQ 附带各种插件扩展,并且你也可以写你自己插件.
- 商业支持:提供商业支持、 培训和咨询。
- 大型社区:有一个庞大的社区 RabbitMQ,有各种各样的客户端、 插件、 指南等。
其基本能力:
跨进程通信
指两个或多个进程之间进行数据交换的过程。 跨进程通信的方式有很多,比如管道、消息队列、共享内存、信号量等异步处理
指在执行某个操作时,不需要等待其完成,可以继续执行其他操作的一种处理方式削峰填谷
指在同一刻传入的数据过大使得服务无法处理,需要等待一些时间,将峰值通过时间一批一批的处理
这个就是基础的架构
- N–M :这个代表的是多对多的关系,这个关系比较重要(意味着必须要用多线程处理);在图中 我表示的是多个交换机可以通过绑定去去控制多个队列
- N–1 :这个代表多对1的关系。这个也比较重要(也意味着我们需要多线程处理);在图中表示,多个客户端去访问一个服务器。
可以看出MQ其实主要由两部分构成,一部分客户端,一部分由服务器构成。而客户端的又分为生产客户端(提交消息)和消费客户端(消费消息)。服务器的构建就有些复杂,需要构图:
生产者(Producer)
:其实就是将消息上传到服务器消费者(Consumer)
:其实就是从服务器中拿出消息发布(Publish)
:就是生产者将消息上传给服务器的过程订阅(Subscribe)
:就是消费者将消息从服务器拿出的过程中间服务器(BrokerServer)
:核心服务器,负责给生产者和消费者提供发布和订阅以及构建交换机和绑定以及队列的API虚拟机(VirtualHost)
: 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost交换机(Exchange)
:⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue队列(Queue)
:真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息绑定(Bind)
: Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.(不理解的可以看上面的图)消息(Message)
:生产者和消费者之间传递的内容内存存储(Memory)
:方便使用,快速调用硬盘存储(Disk)
:重启数据不丢失
操作RabbitMQ
安装
说起安装我就不得不说一下,官方文件的安装简直令人炸毛。个人建议,诺是自己有云,就去看云官方文件如何实现的。如果安装在本地那就没有那么麻烦了,看官网就行。我这边使用的是阿里云,所以这个里放一个阿里云的文档
连接
首选就是在项目中拿到相应的操作远程的mq
go get github.com/streadway/amqp
补充:
- 端口号:15672是管理rabbit的接口
- 端口号:5672是我们,调用,并使用的端口
(如果发现端口调用不了,就必须要在防火墙中将这个端口号打开)(强调一下我这里用的阿里云)
生产者
创建一个生产者的步骤:
- 连接 Connection
- 创建 Channel
- 创建或连接一个交换器
- 创建或连接一个队列
- 交换器绑定队列(不绑定的话,队列在运行完毕后会消失)
- 投递消息
- 关闭 Channel
- 关闭 Connection
创建连接
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
创建通道
// channel
channel, err := connection.Channel()
创建交换机
err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)
参数依次说明:
name
交换机名称kind
交换机类型durable
持久化标识autoDelete
是否自动删除internal
是否是内置交换机noWait
是否等待服务器确认args
其它配置
参数说明要点:
- autoDelete :
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
- internal :
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。
- noWait
当 noWait
为 true
时,声明时无需等待服务器的确认。
该通道可能由于错误而关闭。 添加一个 NotifyClose
侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive )
,他主要是假定交换已存在,并尝试连接到不存在的交换将导致 RabbitMQ
引发异常,可用于检测交换器的存在。
创建队列
q, err := channel.QueueDeclare("q1", true, false, false, true, nil)
参数说明:
name
队列名称durable
持久化autoDelete
自动删除exclusive
排他noWait
是否等待服务器确认args Table
参数说明要点:
- exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接( Connection
)可见的,并且该连接内的所有信道( Channel
)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。
- autoDelete 设置是否自动删除。
为 true
则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
创建队列还有一个差不多的方法( QueueDeclarePassive
),他主要是假定队列已存在,并尝试连接到不存在的队列将导致 RabbitMQ
引发异常,可用于检测队列的存在。
绑定交换器和队列
err = channel.QueueBind("q1", "q1Key", "e1", true, nil)
参数解析:
name
队列名称key BindingKey
根据交换机类型来设定exchange
交换机名称noWait
是否等待服务器确认args Table
绑定交换器(可选)
err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)
参数解析:
destination
目的交换器key RoutingKey
路由键source
源交换器noWait
是否等待服务器确认args Table
其它参数
生产者发送消息至交换器 source
中,交换器 source
根据路由键找到与其匹配的另一个交换器 destination
,井把消息转发到 destination
中,进而存储在 destination
绑定的队列 queue
中,某种程度上来说 destination
交换器可以看作一个队列。
投递消息
err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
})
- exchange 交换器名称
- key RouterKey
- mandatory 是否为无法路由的消息进行返回处理
- immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
- msg 消息体
mandatory
消息发布的时候设置消息的 mandatory
属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true
表示将消息返回到生产者,否则直接丢弃消息。
immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate
参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对 imrnediate
参数的支持。
其中 amqp.Publishing
的 DeliveryMode
如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue
也是需要设定为持久化才有效。
消费者
Rabbitmq 消费方式共有 2 种,分别是推模式
和拉模式
推模式
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
推模式是通过持续
订阅的方式来消费信息, Consume
将信道( Channel
)设置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ
会不断地推送消息给消费者。推送消息的个数还是会受到 channel.Qos
的限制。
参数说明:
queue
队列名称consumer
消息者名称autoAck
是否确认消费exclusive
排他noLocal
noWait bool
args Table
noLocal
设置为 true
则表示不能将同一个 Connection
中生产者发送的消息传送给这个Connection
中的消费者
autoAck
可以设置为 true
或者false
。
- 如果设为
true
则消费者一接收到就从queue
中去除了,如果消费者处理消息中发生意外该消息就丢失了。 - 如果设为
false
则消费者在处理完消息后,调用msg.Ack(false)
后消息才从queue
中去除。即便当前消费者处理该消息发生意外,只要没有执行msg.Ack(false)
那该消息就仍然在queue
中,不会丢失。
如果autoAck
设置为 false
则表示需要手动进行 ack
消费
v, ok := <-deliveries
if ok {
// 手动ack确认
// 注意: 这里只要调用了ack就是手动确认模式,
// v.Ack的参数 multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
// 并不是表示设置为false就不进行当前ack确认
if err := v.Ack(true); err != nil {
fmt.Println(err.Error())
}
} else {
fmt.Println("Channel close")
}
拉模式
相对来说比较简单,是由消费者主动拉取信息来消费,每次只消费一条消息,同样也需要进行 ack
确认消费。
channel.Get(queue string, autoAck bool)
例子
生成者
这个文件起名叫:new_task.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func main() {
// 1. 尝试连接RabbitMQ,建立连接
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()
// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
ch, err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()
// 3. 要发送,我们必须声明要发送到的队列。
q, err := ch.QueueDeclare(
"task_queue", // name
true, // 持久的
false, // delete when unused
false, // 独有的
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Printf("declare a queue failed, err:%v\n", err)
return
}
// 4. 然后我们可以将消息发布到声明的队列
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // 立即
false, // 强制
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
fmt.Printf("publish a message failed, err:%v\n", err)
return
}
log.Printf(" [x] Sent %s", body)
}
// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
消费者
这个文件起名叫:work.go
package main
import (
"bytes"
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()
// 声明一个queue
q, err := ch.QueueDeclare(
"task_queue", // name
true, // 声明为持久队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
fmt.Printf("ch.Qos() failed, err:%v\n", err)
return
}
// 立即返回一个Delivery的通道
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 注意这里传false,关闭自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Printf("ch.Consume failed, err:%v\n", err)
return
}
// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // 手动传递消息确认
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
自己要去终端,DOS中去执行
go run new_task.go
go run work.go
注意常见的问题:
忘记确认
忘记确认
是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,可以使用rabbitmqctl
打印messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows平台,去掉sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
持久化
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。
如果需要更强有力的担保,那么可以使用publisher confirms。
ch.Qos
在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker
将持续忙碌,而另一个worker
几乎不做任何工作,RabbitMQ
对此一无所知,仍然会均匀
地发送消息。
因为RabbitMQ
只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
为了避免这种情况,我们可以将预取计数设置为1
。这告诉RabbitMQ
不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker
。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
关于队列大小的说明
如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。
使用消息确认和预取计数,可以设置工作队列(work queue)。即使RabbitMQ重新启动,持久性选项也可以让任务继续存在。
临时队列
当我们连接到Rabbit
时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。
其次,一旦我们断开消费者的连接,队列就会自动删除。
在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列
当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。
匹配规则
要说匹配规则,就不得不说一个东西,交换机类型
ExchangeType 是个表示的是交换机的类型,为什么要分类型呢?其实很简单,需求决定业务,业务决定技术。
所以这里的需求情况分为3种,
- 一个生产者生产的消息由一个消费者去消费,此时我们叫他(
DIRECT
) - 一个生产者生产的消息由有多个消费者去消费,此时我们叫他(
FANOUT
) - 一个生产者生产的消息有锁,必须由带有锁的消费者去消费,此时我们叫他(
TOPIC
)- 队列绑定在交换机的时候,会指定一个
bindingKey
, - 消息哪里指定一个
outingKey
- 当
bindingKey
与outingKey
满足条件就投递到相应的队列中
- 队列绑定在交换机的时候,会指定一个
所以这里的匹配规则说到底其实就是主题交换机的规则。
发送到topic交换器的消息不能具有随意的routing_key
——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key
示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。routing_key
中可以包含任意多个单词,最多255个字节
如果我们打破约定并发送一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,这些消息将不匹配任何绑定,并且将会丢失
绑定键
也必须采用相同的形式
,topic
交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
*
(星号)可以代替一个单词。#
(井号)可以替代零个或多个单词。
topic交换器
topic交换器功能强大,可以像其他交换器一样运行。
当队列用“#”(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout交换器中一样。
当在绑定中不使用特殊字符“*”(星号)和“#”(井号)时,topic交换器的行为就像direct交换器一样。
例子
生产者:emit_log_topic.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
消费者:receive_logs_topic.go
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
// 绑定topic
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
想要接收所有的日志:
go run receive_logs_topic.go "#"
只想接收“critical”日志:
go run receive_logs_topic.go "*.critical"
如何使用rabbit到这里就结束了。