当前位置: 首页 > article >正文

项目集成篇:springboot集成rabbitmq实现消息发送,消费

在Spring Boot中集成RabbitMQ以实现消息的发送和消费,可以通过使用`spring-boot-starter-amqp`来简化配置。以下是一个完整的示例,展示了如何设置一个基于Spring Boot的应用程序来与RabbitMQ进行交互。

### 1. 添加依赖

首先,在你的`pom.xml`文件中添加必要的依赖:

```xml
<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Boot Starter AMQP for RabbitMQ -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- Lombok (Optional) for reducing boilerplate code -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
```

### 2. 配置RabbitMQ连接

编辑`application.properties`或`application.yml`文件以包含RabbitMQ的连接信息。

#### `application.properties`
```properties
# RabbitMQ server configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
```

#### 或者使用`application.yml`
```yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
```

### 3. 创建消息队列和交换器配置

创建一个配置类来定义RabbitMQ的队列、交换器以及绑定关系。

```java
package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "example_queue";
    public static final String EXCHANGE_NAME = "example_exchange";
    public static final String ROUTING_KEY = "example_routing_key";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}
```

### 4. 创建消息生产者

创建一个服务类来发送消息到RabbitMQ。

```java
package com.example.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MessageProducer {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        log.info("Sending message: {}", message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
    }
}
```

### 5. 创建消息消费者

创建一个监听器类来接收并处理来自RabbitMQ的消息。

```java
package com.example.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(Message message, Channel channel) throws Exception {
        try {
            String msg = new String(message.getBody());
            log.info("Received message: {}", msg);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("Failed to process message", e);
            // 拒绝消息,可以选择重新入队或者丢弃
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}
```

### 6. 创建控制器(可选)

你可以创建一个REST控制器来触发消息的发送。

```java
package com.example.controller;

import com.example.service.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {

    @Autowired
    private MessageProducer producer;

    @PostMapping("/send")
    public String sendMessage(@RequestParam String message) {
        producer.sendMessage(message);
        return "Message sent!";
    }
}
```

### 注意事项
- 确保你的RabbitMQ服务器正在运行并且可以从应用程序访问。
- 如果你在本地开发,请确保RabbitMQ服务已正确安装并在默认端口5672上运行。
- 根据实际情况调整包名、路径以及其他细节。
- 对于生产环境,建议配置RabbitMQ的用户名和密码认证,并考虑更复杂的错误处理机制和日志记录。

通过上述步骤,你已经成功地在一个Spring Boot应用中集成了RabbitMQ,并实现了消息的发送和消费功能。如果有任何问题或需要进一步的帮助,请随时提问!


http://www.kler.cn/a/428874.html

相关文章:

  • Vue 3 自定义 Hook:实现页面数据刷新与滚动位置还原
  • uni-app vue3 常用页面 组合式api方式
  • 3. 后端验证前端Token
  • OODA循环在网络安全运营平台建设中的应用
  • 多个页面一张SQL表,前端放入type类型
  • 软考高级5个资格、中级常考4个资格简介及难易程度排序
  • devops-Dockerfile+Jenkinsfile方式部署Java前后端应用
  • 如何解决maven项目使用Ctrl + /添加注释时的顶格问题
  • 校园综合服务小程序+ssm
  • Halcon 瑕疵检测原理及应用
  • Ubuntu与Centos系统有何区别?
  • 【C语言练习(1)—练习实参和形参之间参数传递】
  • 云原生多模数据库 Lindorm
  • 【OceanBase 诊断调优】—— 日志归档延迟或日志归档慢的原因和解决方法
  • 位运算(一)位运算简单总结
  • 总结的一些MySql面试题
  • 【Mac OS 安装 Homebrew】
  • EasyExcel注解使用
  • python GUI编程
  • C++创建型模式之生成器模式
  • compiler-core核心原理
  • 机器学习—学习过程
  • [笔记] Windows 上 Git 安装详细教程:从零开始,附带每个选项解析
  • 常见算法java语法
  • JavaScript中todolist操作--待办事项的添加 删除 完成功能
  • 实例教程:BBDB为AHRS算法开发提供完善的支撑环境(下)