当前位置: 首页 > article >正文

【消息队列】RabbitMQ实现消费者组机制

目录

1. RabbitMQ 的 发布订阅模式

2. GRPC 服务间的实体同步

2.1 生产者服务

2.2 消费者服务

3. 可靠性

3.1 生产者丢失消息

3.2 消费者丢失消息

3.3 RabbitMQ 中间件丢失消息


1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go

  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列接收消息

2. GRPC 服务间的实体同步

考虑以下业务需求——

  • 模拟消费者组机制:
    • 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
    • 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
  • 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据

服务之间的实体数据同步方案:

2.1 生产者服务

(1) 初始化

生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:

  • 声明交换机 exchange_user、exchange_group
  • 声明消费者 consumer_user_rpc、consumer_org_rpc
  • 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变更时发送消息

发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。

2.2 消费者服务

消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。

  • 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费

Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~

3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列

在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。

func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {
	// ...

	// 初始化交换机和队列
	for topic, consumerGroups := range option.TopicConsumerGroupsBinding {
		err = initExchange(topic, consumerGroups, mq)
		if err != nil {
			return nil, err
		}
	}
	return mq, nil
}

func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {
	// 1. 创建发送通道
	pch, err := mq.conn.Channel()
	if err != nil {
		return err
	}
	mq.produceChannels[exchange] = pch

	// 2. 开启消息确认机制
	if err := pch.Confirm(false); err != nil {
		return err
	}

	// 3. 创建交换机
	// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
	err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
	if err != nil {
		return err
	}
	slog.Info("rabbitmq declared exchange", "exchange_name", exchange)

	// 4. 创建队列并绑定到交换机
	for _, consumerGroup := range strings.Split(consumerGroups, ",") {
		consumerGroup = strings.TrimSpace(consumerGroup)
		if consumerGroup == "" {
			continue
		}
		queue := queueName(exchange, consumerGroup)

		// 创建队列
		// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
		_, err = pch.QueueDeclare(queue, true, false, false, false, nil)
		if err != nil {
			return err
		}

		// 将队列绑定到交换机
		// 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数
		err = pch.QueueBind(queue, "", exchange, false, nil)
		if err != nil {
			return err
		}
		slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)

		// 创建接收通道
		cch, err := mq.conn.Channel()
		if err != nil {
			return err
		}
		mq.consumeChannels[queue] = cch
	}

	// 5. 开启消息确认事件监听、消息投递事件监听
	mq.publishWatcher[exchange] = &watcher{
		returnCh:  pch.NotifyReturn(make(chan amqp.Return)),
		confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
	}
	// 监听未被交换机投递的消息
	go func() {
		for ret := range mq.publishWatcher[exchange].returnCh {
			// 尝试重新投递
			ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)
			if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {
				slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)
			} else {
				slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)
			}
			time.Sleep(time.Second * 3)
		}
	}()
	return nil
}

(2) 发送重试

发送消息时增加重试机制。若超过重试上限,需记录日志或报警。

func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {
	body, _ := json.Marshal(data)
	msgID := uuid.New()

	var retried int
	for {
		err := r.publish(ctx, topic, msgID, body, time.Now())
		if err == nil {
			return nil
		}

		retried++
		if retried > r.option.RetryNum {
			return err
		}
		time.Sleep(r.option.RetryInterval)
	}
}

(3) confirm 消息确认机制

生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:

func initExchange(exchange string, mq *RabbitMQ) error {
	// ...
	// 开启消息确认机制
	if err := pch.Confirm(false); err != nil {
		return err
	}
	
	// 创建监听器
	mq.publishWatcher[exchange] = &watcher{
		confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
	}
	// ...
}

func (r *RabbitMQ) publish(ctx context.Context, ...) error {
	// publish发送消息
	// ...
	
	// 等待rabbitmq返回消息确认
	select {
	case confirm := <-r.publishWatcher[exchange].confirmCh:
		if !confirm.Ack {
			return errors.New("publish failed, got nack from rabbitmq")
		}
	case <-ctx.Done():
		return errors.New("context deadline, publish to rabbitmq timeout")
	case <-time.After(r.config.Timeout):
		return errors.New("publish to rabbitmq timeout")
	}
	return nil
}

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:

func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {
	// ...

	return consumeChannel.Ack(deliveryTag, false)
}

如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:

func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {
	// ...

	return consumeChannel.Nack(deliveryTag, false, true)
}

3.3 RabbitMQ 中间件丢失消息

(1) 数据持久化到磁盘

交换机持久化(durable=true):

// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)

队列持久化(durable=true):

// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)

消息持久化(DeliveryMode=Persistent):

err := ch.Publish(
	exchange, // 交换机名称
	"",       // 路由键
	true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息
	false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃
	amqp.Publishing{
		MessageId:    msgID,              // 消息ID
		ContentType:  "application/json", // 消息内容类型
		Body:         body,               // 消息内容
		DeliveryMode: amqp.Persistent,    // 消息需要持久化
		Timestamp:    t,                  // 消息时间
	},
)

(2) RabbitMQ 本身的数据一致性保证

RabbitMQ 使用 raft 共识算法保证数据一致性:

https://www.rabbitmq.com/docs/clustering#replica-placement


http://www.kler.cn/news/362173.html

相关文章:

  • vue3.0 + vite打包完成后,将dist下的资源包打包成zip
  • 基于node.js宜家宜业物业管理系统【附源码】
  • 宣恩文旅微短剧双作开机,融合创新助力城市经济发展
  • 基于知识图谱的美食推荐系统
  • AI时代,谷歌会像当年的IBM一样陨落吗?
  • Android 设置控件为圆形
  • 计算机毕业设计 基于Python的校园个人闲置物品换购平台的设计与实现 Python毕业设计 Python毕业设计选题【附源码+安装调试】
  • Java中的Vector,看着陌生?
  • API接口测试与优化:确保应用稳定性的必要步骤
  • Python 深度Q网络(DQN)算法详解与应用案例
  • 计算机网络考研笔记
  • 力扣题51~70
  • 动手学深度学习9.7. 序列到序列学习(seq2seq)-笔记练习(PyTorch)
  • 如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?
  • 集成必看!Air780E开发板集成EC11旋转编码器的可靠解决方案~
  • 二、Linux 系统命令
  • c++ 对象作用域
  • 代码随想录算法训练营第十九天|Day19二叉树
  • Python包——numpy2
  • 6,000 个网站上的假 WordPress 插件提示用户安装恶意软件
  • 前端 js 处理一个数组 展示成层级下拉样式
  • 理解和解决TCP 网络编程中的粘包与拆包问题
  • 【C++】创建TCP服务端
  • DLNA—— 开启智能生活多媒体共享新时代
  • 线性可分支持向量机的原理推导 9-23拉格朗日乘子α的最大化问题 公式解析
  • Spring中导致事务传播失效的情况(自调用、方法访问权限、异常处理不当、传播类型选择错误等。在实际开发中,务必确保事务方法正确配置)