当前位置: 首页 > article >正文

Go实现RabbitMQ 死信队列、优化

  1. 死信队列是什么?

  2. 如何配置死信队列?

  3. go实现死信队列

  4. go使用发布订阅模式改造批量发送消息

【教学内容】

1. 死信队列是什么?

注意:业务队列与死信交换机的绑定是在构建业务队列时,通过参数(x-dead-letter-exchange和x-dead-letter-routing-key)的形式进行指定。

通俗来讲,无法被正常消费的消息,我们可以称之为死信。我们将其放入死信队列,单独处理这部分“异常”消息。

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况之一:

(1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。

(2)消息在队列的存活时间超过设置的TTL时间。

(3)消息队列的消息数量已经超过最大队列长度。 那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2.如何配置死信队列?


如何配置死信队列呢?大概可以分为以下步骤:

配置业务队列,绑定到业务交换机上

为业务队列配置死信交换机和路由key

为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

Golang 实现 RabbitMQ 的死信队列 https://segmentfault.com/a/1190000041358981

队列、交换机、routing-key 放到消费者一方来实现。所以消费者者一共需要做这几件事

  1. 创建连接

  2. 设置队列(队列、交换机、绑定)

  3. 设置死信队列(队列、交换机、绑定)

  4. 发布消息

设置队列(队列、交换机、绑定)

核心操作就是设置队列阶段。

声明普通队列,并指定死信交换机、指定死信routing-key。后续死信队列创建后会与死信交换机、指定死信routing-key进行绑定

//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机
queueA, err := channel.QueueDeclare(
    queueAName, // name
    true,       // durable
    false,      // delete when usused
    false,      // exclusive
    false,      // no-wait
    amqp.Table{
        // 当消息过期时把消息发送到 exchangeB
        "x-dead-letter-exchange": exchangeB,
        "x-message-ttl":          ttl,
        //"x-dead-letter-queue" : queueBName,
        //"x-dead-letter-routing-key" :
        },
    )
if err != nil {
    fmt.Println(err)
    return
}

声明交换机

err = channel.ExchangeDeclare(
        exchangeA, // name
        "fanout",  // type
        true,      // durable
        false,     // auto-deleted
        false,     // internal
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        fmt.Println(err)
        return
    }

目前,普通队列和交换机都已经创建,但它们都是独立存在,没有关联。

通过 QueueBind 将队列、routing-key、交换机三者绑定到一起。


err = channel.QueueBind(
    queueA.Name, // queue name
    "",          // routing key
    exchangeA,   // exchange
    false,
    nil,
)
if err != nil {
    fmt.Println(err)
    return
}

设置死信队列(队列、交换机、绑定)

同样死信队列,也需要创建队列、创建交换机和绑定。

// 声明死信队列
// args 为 nil。切记不要给死信队列设置消息过期时间,否则失效的消息进入死信队列后会再次过期。
queueB, err := channel.QueueDeclare(
   queueBName, // name
   true,       // durable
   false,      // delete when usused
   false,      // exclusive
   false,      // no-wait
   nil,        // arguments
)
if err != nil {
   fmt.Println(err)
   return
}

// 声明交换机

err = channel.ExchangeDeclare(
   exchangeB, // name
   "fanout",  // type
   true,      // durable
   false,     // auto-deleted
   false,     // internal
   false,     // no-wait
   nil,       // arguments
)
if err != nil {
   fmt.Println(err)
   return
}

// 队列绑定(将队列、routing-key、交换机三者绑定到一起)

err = channel.QueueBind(
   queueB.Name, // queue name
   "",          // routing key
   exchangeB,   // exchange
   false,
   nil,
)
if err != nil {
   fmt.Println(err)
   return
}

当死信队列建立完毕,普通队列通过 x-dead-letter-exchange 和 x-dead-letter-routing-key 参数的指定,便可生效,死信队列便与普通队列连通。

3. go实现死信队列

实现路由访问

编写生产者

方便测试,开启两个生产者

一个生产者,执行死信队列生产消息

另一个生产者,创建绑定的队列(死信队列中绑定的队列)生产消息

实现消费者

创建mq/dlx/main.go,实现消费

效果:

执行两个生产者

查看RabbitMQ客户端

点击交换机查看详细情况

点击queues,查看队列情况

启动消费者:

查看消费情况

死信队列转发给了b队列

4. go使用发布订阅模式改造批量发送消息

修改批量发送消息逻辑

编写入队列逻辑

执行批量发送接口

查看RabbitMQ客户端,检查交换机状态

启动消费者

注:当执行消费者时,因为是main.go执行,所以不会共享连接数据库,导致数据库连接default连接不上

重新连接数据库配置

系统默认参数

beego 中带有很多可配置的参数,我们来一一认识一下它们,这样有利于我们在接下来的 beego 开发中可以充分的发挥他们的作用(你可以通过在 conf/app.conf 中设置对应的值,不区分大小写):

基础配置
  • BConfig
    保存了所有 beego 里面的系统默认参数,你可以通过 web.BConfig 来访问和修改底下的所有配置信息.

配置文件路径,默认是应用程序对应的目录下的 conf/app.conf,用户可以在程序代码中加载自己的配置文件
beego.LoadAppConfig("ini", "conf/app2.conf")
也可以加载多个文件,只要你调用多次就可以了,如果后面的文件和前面的 key 冲突,那么以最新加载的为最新值

消费者实现业务逻辑(消息入库)

消费者执行成功

navicat查看结果


http://www.kler.cn/a/330450.html

相关文章:

  • 1-1 电场基本概念
  • 搭建prometheus+grafana监控系统抓取Linux主机系统资源数据
  • 互斥信号量的等待与通知
  • 备战蓝桥杯 队列和queue详解
  • 手写系列——VPG算法或REINFORCE算法
  • SpringCloud
  • 《重生到现代之从零开始的C语言生活》—— 字符函数和字符串函数
  • 数据结构双向链表和循环链表
  • ubuntu 18.04 cuda 11.01 gpgpu-sim 裸机编译
  • IDEA关联Tomcat
  • Mac 电脑配置yolov8运行环境实现目标追踪、计数、画出轨迹、多线程
  • 【MAUI】CommunityToolkit社区工具包介绍
  • k8s 部署 grafana
  • React中Hooks使用
  • MATLAB计算与建模常见函数:4.插值
  • k8s搭建双主的mysql8集群---无坑
  • 猫猫cpu的缓存
  • 使用 Node.js 创建一个 WebSocket 服务器
  • 如何使用工具删除 iPhone 上的图片背景
  • 文心一言 VS 讯飞星火 VS chatgpt (359)-- 算法导论24.3 1题
  • 本地运行LLama 3.2的三种方法
  • 多旋翼无人机“仿鸟类”精确拦截飞行目标,助力低空安全
  • 微信小程序技术框架选型
  • 在java后端发送HTTPClient请求
  • 用CSS创造三角形案例
  • 数据结构:c++ (OJ202) 快乐数