Go实现RabbitMQ 死信队列、优化
-
死信队列是什么?
-
如何配置死信队列?
-
go实现死信队列
-
go使用发布订阅模式改造批量发送消息
【教学内容】
1. 死信队列是什么?
注意:业务队列与死信交换机的绑定是在构建业务队列时,通过参数(x-dead-letter-exchange和x-dead-letter-routing-key)的形式进行指定。
通俗来讲,无法被正常消费的消息,我们可以称之为死信。我们将其放入死信队列,单独处理这部分“异常”消息。
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况之一:
(1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
(2)消息在队列的存活时间超过设置的TTL时间。
(3)消息队列的消息数量已经超过最大队列长度。 那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
2.如何配置死信队列?
如何配置死信队列呢?大概可以分为以下步骤:
配置业务队列,绑定到业务交换机上
为业务队列配置死信交换机和路由key
为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
Golang 实现 RabbitMQ 的死信队列 https://segmentfault.com/a/1190000041358981
队列、交换机、routing-key 放到消费者一方来实现。所以消费者者一共需要做这几件事
-
创建连接
-
设置队列(队列、交换机、绑定)
-
设置死信队列(队列、交换机、绑定)
-
发布消息
设置队列(队列、交换机、绑定)
核心操作就是设置队列阶段。
声明普通队列,并指定死信交换机、指定死信routing-key。后续死信队列创建后会与死信交换机、指定死信routing-key进行绑定。
//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机
queueA, err := channel.QueueDeclare(
queueAName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
amqp.Table{
// 当消息过期时把消息发送到 exchangeB
"x-dead-letter-exchange": exchangeB,
"x-message-ttl": ttl,
//"x-dead-letter-queue" : queueBName,
//"x-dead-letter-routing-key" :
},
)
if err != nil {
fmt.Println(err)
return
}
声明交换机
err = channel.ExchangeDeclare(
exchangeA, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println(err)
return
}
目前,普通队列和交换机都已经创建,但它们都是独立存在,没有关联。
通过 QueueBind 将队列、routing-key、交换机三者绑定到一起。
err = channel.QueueBind(
queueA.Name, // queue name
"", // routing key
exchangeA, // exchange
false,
nil,
)
if err != nil {
fmt.Println(err)
return
}
设置死信队列(队列、交换机、绑定)
同样死信队列,也需要创建队列、创建交换机和绑定。
// 声明死信队列
// args 为 nil。切记不要给死信队列设置消息过期时间,否则失效的消息进入死信队列后会再次过期。
queueB, err := channel.QueueDeclare(
queueBName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println(err)
return
}
// 声明交换机
err = channel.ExchangeDeclare(
exchangeB, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println(err)
return
}
// 队列绑定(将队列、routing-key、交换机三者绑定到一起)
err = channel.QueueBind(
queueB.Name, // queue name
"", // routing key
exchangeB, // exchange
false,
nil,
)
if err != nil {
fmt.Println(err)
return
}
当死信队列建立完毕,普通队列通过 x-dead-letter-exchange
和 x-dead-letter-routing-key
参数的指定,便可生效,死信队列便与普通队列连通。
3. go实现死信队列
实现路由访问
编写生产者
方便测试,开启两个生产者
一个生产者,执行死信队列生产消息
另一个生产者,创建绑定的队列(死信队列中绑定的队列)生产消息
实现消费者
创建mq/dlx/main.go,实现消费
效果:
执行两个生产者
查看RabbitMQ客户端
点击交换机查看详细情况
点击queues,查看队列情况
启动消费者:
查看消费情况
死信队列转发给了b队列
4. go使用发布订阅模式改造批量发送消息
修改批量发送消息逻辑
编写入队列逻辑
执行批量发送接口
查看RabbitMQ客户端,检查交换机状态
启动消费者
注:当执行消费者时,因为是main.go执行,所以不会共享连接数据库,导致数据库连接default连接不上
重新连接数据库配置
系统默认参数
beego 中带有很多可配置的参数,我们来一一认识一下它们,这样有利于我们在接下来的 beego 开发中可以充分的发挥他们的作用(你可以通过在 conf/app.conf
中设置对应的值,不区分大小写):
基础配置
- BConfig
保存了所有 beego 里面的系统默认参数,你可以通过web.BConfig
来访问和修改底下的所有配置信息.
配置文件路径,默认是应用程序对应的目录下的
conf/app.conf
,用户可以在程序代码中加载自己的配置文件beego.LoadAppConfig("ini", "conf/app2.conf")
也可以加载多个文件,只要你调用多次就可以了,如果后面的文件和前面的 key 冲突,那么以最新加载的为最新值
消费者实现业务逻辑(消息入库)
消费者执行成功
navicat查看结果