RabbitMQ-死信队列(golang)
1、概念
死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。
2、死信产生的原因
文心一言的回答如下:
- 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
- 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
- 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
- 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。
总结来说,主要原因就三个,消息被拒绝、消息过期、队列满
在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。
3、死信队列使用实践
3.1 消息过期
设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。
关键点:设置正常队列属性,ttl5s过期:
// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
args := amqp.Table{
"x-message-ttl": int64(5000), // 5秒TTL
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlx.Name,
}
// 声明正常队列
q, err := ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
全部代码如下:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
// 声明死信队列
dlx, err := ch.QueueDeclare(
"dead_letter_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
return
}
err = ch.ExchangeDeclare(
"my_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
return
}
// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
args := amqp.Table{
"x-message-ttl": int64(5000), // 5秒TTL
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlx.Name,
}
// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息
q, err := ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
return
}
// 将正常队列绑定到交换机,并设置死信交换机和路由键
err = ch.QueueBind(
q.Name, // queue name
q.Name, // routing key
"my_exchange", // exchange
false,
nil,
)
if err != nil {
fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
return
}
err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
if err != nil {
fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
return
}
}
队列信息包括绑定的死信队列信息、ttl等信息如下:
运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。
3.2 队列满
当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。
设置队列长度属性代码如下:
args := amqp.Table{
// "x-message-ttl": int64(5000), // 5秒TTL
"x-max-length": 2,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlx.Name,
}
// 声明正常队列
q, err := ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
队列属性如下:
发送两条信息:
继续发送第三个:
测试代码:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
// 声明死信队列
dlx, err := ch.QueueDeclare(
"dead_letter_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
return
}
err = ch.ExchangeDeclare(
"my_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
return
}
// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
args := amqp.Table{
// "x-message-ttl": int64(5000), // 5秒TTL
"x-max-length": 2,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlx.Name,
}
// 声明正常队列
q, err := ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
return
}
// 将正常队列绑定到交换机,并设置死信交换机和路由键
err = ch.QueueBind(
q.Name, // queue name
q.Name, // routing key
"my_exchange", // exchange
false,
nil,
)
if err != nil {
fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
return
}
err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
if err != nil {
fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
return
}
}
3.3 消息被拒绝
消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。
客户端函数:ch.Nack,函数原型:
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
ch.m.Lock()
defer ch.m.Unlock()
return ch.send(&basicNack{
DeliveryTag: tag,
Multiple: multiple,
Requeue: requeue,
})
}
入参含义如下:
tag | 这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 |
multiple | 这是一个布尔值( |
requeue | 这也是一个布尔值( |
测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:
之后使用如下代码进行消费:
package main
import (
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
err = ch.ExchangeDeclare(
"my_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
return
}
// 声明正常队列
// q, err := ch.QueueDeclare(
// "normal_queue", // name
// true, // durable
// false, // delete when unused
// false, // exclusive
// false, // no-wait
// nil, // arguments
// )
// if err != nil {
// fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
// return
// }
// 将正常队列绑定到交换机,并设置死信交换机和路由键
err = ch.QueueBind(
"normal_queue", // queue name
"normal_queue", // routing key
"my_exchange", // exchange
false,
nil,
)
if err != nil {
fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
return
}
msgs, _ := ch.Consume(
"normal_queue", // queue
"", // consumer
false, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
go func() {
for d := range msgs {
// 模拟处理失败,全部放入死信队列
ch.Nack(d.DeliveryTag, false, false)
}
}()
time.Sleep(10 * time.Second)
}
运行代码后,3条消息全部进入到死信队列中:
4、总结
RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。