当前位置: 首页 > article >正文

Go实现RabbitMQ消息模式

【目标】

  1. go实现RabbitMQ简单模式和work工作模式

  2. go实现RabbitMQ 消息持久化和手动应答

  3. go实现RabbitMQ 发布订阅模式

  4. go使用MQ实现评论后排行榜更新

1. go实现简单模式

编写路由实现生产消息

实现生产消息

MQ消息执行为命令行执行,所以创建命令行执行函数main,用来消费消息

创建mq/demo/main.go

浏览器中访问路由,执行生产者生产消息

打开http://localhost:15672/#/queues, 查看RabbitMQ客户端查看是否消息

执行消费者,实现消息消费

进入 mq/demo/中,执行bee run

2. go实现work工作模式

在启动另一个窗口,实现第二个消费者

生产消息

打开RabbitMQ客户端,查看消费者

查看work消费

两个work时,轮询执行消费

2.1 go实现RabbitMQ消息持久化和手动应答

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化

生产者实现消息持久化

第二个参数设置为true,即durable=true.

消费者实现消息持久化

在RabbitMQ服务重启或者服务宕机的情况下,也不会丢失消息。

可以将Queue与Message都设置为可持久化(durable),这样可以保证绝大部分情况下RabbitMQ消息不会丢失。

手动应答

RabbitMQ 消息应答机制

消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

如果是自动应答模式,消费者在处理任务的过程中宕机了,那么消息将会丢失,而手动应答则能够保证消息不会被丢失,所以在实际的应用当中绝大多数都采用手动应答

为了保证消息从队列可靠地达到消费者并且被消费者消费处理,RabbitMQ 提供了消息应答机制,RabbitMQ 有两种应答机制,自动应答和手动应答

1、自动应答

RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息

2、手动应答

RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除

消息应答:

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

手动应答优点:

可以批量应答并且减少网络拥堵

消费方法中设置手动应答

效果:

关闭自动应答

RabbitMQ中查看

开启手动应答后,才返回消息执行成功,保证了消息不会被丢失

3. go实现RabbitMQ 发布订阅模式

RabbitMq消息模式的核心思想是:

一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

  实际上,生产者只能把消息发送给一个exchange(交换机),exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

有四种类型的交换器,分别是:direct、topic、headers、fanous(广播模式)

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

go实现RabbitMQ 发布订阅模式 RabbitMQ tutorial - Publish/Subscribe | RabbitMQ

实现广播模式(发布订阅模式)demo

生产者向交换机中发送消息

和简单模式、work模式相比,多了创建交换机

消费者拉取交换机中消息实现消费

和简单模式、work模式相比,多了创建交换机、创建了临时队列、绑定临时队列

临时队列

  我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

 demo中的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

  首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

  其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

  通过queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

实现发布订阅模式:

创建消息路由

控制器中实现生产者消息推送到交换机

创建mq/fanout/main.go,实现消费者从交换机中获取消息实现消费

效果:

执行生产者,实现消息生产

打开RabbitMQ客户端,查看消息状态

执行消费者,实现消费

注:因为是发布订阅模式。所以我们启动两个消费者实现多个用户消费同一消息

消费者1

消费者2

当生产者生产消息时,所订阅的消费者会执行消费

消费者1

消费者2

4. go实现RabbitMQ 路由模式


一个通过路由把One的消息取出来,另一个通过路由把two的消息取出来,一个队列打印奇数,一个队列打印偶数

生产者代码

消费者代码奇数代码


消费者代码偶数代码

运行效果

5. go实现RabbitMQ 主题模式


生产者代码

// topic主题push
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {
   //创建线程执行(发送自增的数字到队列中)
   go func() {
      count := 0
      for {
         if count%2 == 0 {
            //strconv.Itoa 把count转化为字符串
            mq.PublishEx("wsyb.demo.topic", "topic", "wsyb.video", "wsyb.video"+strconv.Itoa(count))
         } else {
            mq.PublishEx("wsyb.demo.topic", "topic", "user.wsyb", "user.wsyb"+strconv.Itoa(count))
         }
         count++
         time.Sleep(1 * time.Second)

      }
   }()

   this.Ctx.WriteString("topic")
}

// topic主题push
// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {
   //创建线程执行(发送自增的数字到队列中)
   go func() {
      count := 0
      for {
         if count%2 == 0 {
            //strconv.Itoa 把count转化为字符串
            mq.PublishEx("wsyb.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))
         } else {
            mq.PublishEx("wsyb.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))
         }
         count++
         time.Sleep(1 * time.Second)

      }
   }()

   this.Ctx.WriteString("topic")
}

消费所有主题代码(#)

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费  # 代表获取所有的数据
   mq.ConsumerEx("wsyb.demo.topic", "topic", "#", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("topic all msg is :%s\n", s)
}

匹配多个规则进行消费

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费 * 匹配一个或者多个符合规则的数据
   mq.ConsumerEx("wsyb.demo.topic", "topic", "*.frog.*", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("topic frog msg is :%s\n", s)
}

匹配一个规则进行消费

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费
   mq.ConsumerEx("wsyb.demo.topic", "topic", "wsyb.*", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("tpic wsyb msg is :%s\n", s)
}

运行结果

6. rabbitmq死信队列

6.1应用场景:
  1. 发送消息规定10分钟以后发送给用户
  2. 规定消息每天固定的时间发送
    3.下了订单没有支付,30分钟以后就会取消订单
    4.订单相关的,下单以后会定时收到会系统的提示消息
6.2什么是死信队列呢:

死信队列产生的条件,不仅是ttl时间过期了,还有消息被拒绝,队列达到最大长度,都会产生死信,相信大家已经明白了

7. go使用MQ实现评论后排行榜更新

修改逻辑,新增评论时更新redis排行榜的数据

发布评论

打开MQ客户端,查看队列状态

创建mq/top/main.go,连接数据库

在消费回调函数中,编写消费者逻辑实现排行榜更新

执行消费者

效果:

先评论内容

打开redis可视化界面,查看排行榜评论数

再次评论

打开redis可视化界面,查看排行榜评论数是否实现更新


http://www.kler.cn/a/326931.html

相关文章:

  • 【万字详文介绍】:迭代扩张卷积神经网络(IDCNN)
  • 冒泡排序、选择排序、计数排序、插入排序、快速排序、堆排序、归并排序JAVA实现
  • 盘点 2024 十大免费/开源 WAF
  • 交换机的基本配置
  • windows XP,ReactOS系统3.4 共享映射区(Section)---2
  • HTTP 405 Method Not Allowed:解析与解决
  • 科研绘图系列:R语言堆积图(stacked barplot)
  • 数据驱动农业——农业中的大数据
  • MySQL | excel数据输出insert语句
  • STM8S003F定时器延时
  • 【华为HCIP实战课程二】OSPF基础介绍和OSPF RID NBMA配置详解
  • 软件测试学习笔记丨curl命令发送请求
  • 【机器学习】---异构数据融合
  • 【C语言】字符和字符串函数(2)
  • uniapp在线打包的ios后调用摄像头失败的解决方法
  • 51单片机的智能垃圾桶【proteus仿真+程序+报告+原理图+演示视频】
  • 数据库课程设计案例:在线图书管理系统
  • SpringBoot+Activiti7工作流入门实例
  • 2024年国外优质API:情绪识别口碑佳
  • fastapp-微信开发GPT项目第一课
  • CSV数据行(取值)的列数多于表头字段数-Pandas无法正常读取
  • 主成分分析法
  • 杨辉三角-C语言
  • ppt模板如何制作?建议试试这4招
  • golang学习笔记17-切片
  • 正则表达式和re模块