当前位置: 首页 > article >正文

RabbitMQ 实现分组消费满足服务器集群部署

实现思路

  1. 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
  2. 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
  3. 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。

示例场景

假设我们有两个消费者组:GroupAGroupB。我们希望发送的消息能够同时被 GroupAGroupB 消费,但每个组内的多个消费者只会有一个成员消费该消息。

Spring Boot + RabbitMQ 广播式分组消费示例

1. 引入依赖

首先,在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖:

<dependencies>
    <!-- Spring Boot Starter for RabbitMQ -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列

application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机,并为每个消费者组创建一个独立的队列。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

  # RabbitMQ 配置
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认消息
        concurrency: 5           # 每个队列的并发消费者数量
        max-concurrency: 10      # 最大并发消费者数量

  # 自定义队列和交换机配置
  rabbitmq:
    queues:
      group-a-queue:
        name: group_a_queue
        durable: true
      group-b-queue:
        name: group_b_queue
        durable: true

    exchanges:
      fanout-exchange:
        name: fanout_exchange
        type: fanout
        durable: true

    bindings:
      group-a-binding:
        exchange: fanout_exchange
        queue: group_a_queue
      group-b-binding:
        exchange: fanout_exchange
        queue: group_b_queue
3. 创建 RabbitMQ 配置类

创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 定义扇出交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange", true, false);
    }

    // 定义 Group A 队列
    @Bean
    public Queue groupAQueue() {
        return new Queue("group_a_queue", true);
    }

    // 定义 Group B 队列
    @Bean
    public Queue groupBQueue() {
        return new Queue("group_b_queue", true);
    }

    // 绑定 Group A 队列到扇出交换机
    @Bean
    public Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(groupAQueue).to(fanoutExchange);
    }

    // 绑定 Group B 队列到扇出交换机
    @Bean
    public Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(groupBQueue).to(fanoutExchange);
    }
}
4. 发送消息

创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键,它会将消息广播到所有绑定的队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    // 发送消息到扇出交换机
    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend("fanout_exchange", "", message);
    }
}
5. 创建消费者

为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener 注解来监听队列中的消息。

Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class GroupAConsumer {

    @RabbitListener(queues = "group_a_queue")
    public void consumeMessage(String message) {
        System.out.println("Group A consumer received: " + message);
        // 处理 Group A 的逻辑
    }
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class GroupBConsumer {

    @RabbitListener(queues = "group_b_queue")
    public void consumeMessage(String message) {
        System.out.println("Group B consumer received: " + message);
        // 处理 Group B 的逻辑
    }
}
6. 启动应用程序并测试

启动 Spring Boot 应用程序后,GroupAConsumerGroupBConsumer 会分别监听 group_a_queuegroup_b_queue。通过以下方式测试消息的发送和消费:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class TestRunner implements CommandLineRunner {

    private final MessageProducer messageProducer;

    @Autowired
    public TestRunner(MessageProducer messageProducer) {
        this.messageProducer = messageProducer;
    }

    @Override
    public void run(String... args) throws Exception {
        // 发送一条消息
        messageProducer.sendMessage("This is a broadcast message");
    }
}

7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。


http://www.kler.cn/a/430087.html

相关文章:

  • 嵌入式C语言:二维数组
  • Zstandard压缩算法
  • Unity3D使用GaussianSplatting加载高斯泼溅模型
  • 系统思考与因果智慧
  • CES 2025|美格智能高算力AI模组助力“通天晓”人形机器人震撼发布
  • Flutter:封装一个自用的bottom_picker选择器
  • SpringCloud提供的多维度解决方案:构建高效微服务生态系统
  • QT 12月5日练习
  • 11.12[CQU JAVEE_EXP3][JAVA WEB]3h速成JAVA WEB;DE启动Tomcat的各种BUG;GIT
  • 设计模式 在PLM系统的应用场景介绍
  • E卷-计算网络信号200分
  • Linux:Ext系列文件系统
  • 微信小程序uni-app+vue3实现局部上下拉刷新和scroll-view动态高度计算
  • 深度学习(2)前向传播与反向传播
  • Python爬虫——猫眼电影
  • Linux setfacl lsattr chattr 命令详解
  • 什么是 k8s CNI ?
  • 研究生第一篇文献综述怎么写,文献检索,文章整理,文献归纳高效方法小技巧【学习笔记】
  • 解决view-ui-plus 中表单验证不通过问题,select 组件开启multiple模式 总是提示错误,即使不验证也提示,有值也验证失败
  • 亚马逊云科技re:Invent大会:数据与AI如何颠覆企业未来?
  • Tr0ll: 1 Vulnhub靶机渗透笔记
  • 阿里内部正式开源“Spring Cloud Alibaba (全彩小册)”
  • Android问题记录 - Inconsistent JVM-target compatibility detected for tasks
  • 05-树莓派-交叉编译
  • PHP和GD如何给图片添加滤镜效果
  • 【QNX+Android虚拟化方案】134 - QNX侧配置开机自动抓取tcpdump 报文