springboot框架使用RabbitMQ举例代码
以前分享过一个理论有兴趣的小伙伴可以看下
https://blog.csdn.net/Drug_/article/details/138164180
不多说 还是直接上代码
第一步:引入依赖 可以不指定版本
<!-- amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步 配置文件
#配置rabbitMq 服务器
rabbitmq:
host: ${rabbitmq.rabbitmqHost}
port: ${rabbitmq.rabbitmqPort}
username: ${rabbitmq.rabbitmqUsername}
password: ${rabbitmq.rabbitmqPassword}
virtual-host: ${rabbitmq.rabbitmqVhost}
connection-timeout: 5000
#消费者配置
listener:
simple:
# 不建议使用 自带的重试配置 因为有几种情况会失效 在网上摘抄的 网友的测试
# 重试机制使用场景:
# 1. 如果是业务代码,比如空指针之类的异常那重试机制其实没什么用
# 2. 代码中不能使用try/catch捕获异常,否则重试机制失效
# 我在消费者 使用了 try 发现 确实失效了 所以 我觉得 需要手动在消费者里累计重试次数 自行处理异常
# retry:
# enabled: true #开启消费者retry重试机制
# max-attempts: 3 # 最大重试次数
# multiplier: 2.0 # 重试间隔时间倍数
# initial-interval: 1000 # 初始重试间隔时间(毫秒)
# max-interval: 10000 # 最大重试间隔时间(毫秒)
acknowledge-mode: manual # 手动确认消息,防止消息丢失 auto manual手动确认模式
default-requeue-rejected: true #是否将拒绝的消息重新入队。默认是 true,即拒绝的消息会重新入队。 配合手动确认模式
concurrency: 1 #: 消费者线程池的并发数。设置同时处理的消费者数量
max-concurrency: 1 #最大并发消费者数量
prefetch: 1 # 限制每个消费者一次可以获取的消息数量,防止消息在某个消费者身上发生阻塞
#生产者配置
# publisher-returns: true # 启用发布者返回模式。设置为 true 启用,确保如果消息无法路由到目标队列,则会返回给生产者。
# none: 不启用发布者确认。
# correlated: 启用发布者确认并使用 CorrelationData 对象,可以在回调中进行处理。
#: 启用简单的发布者确认模式,不带 CorrelationData。
# publisher-confirm-type: none
第三步定义常量 :
package com.testweb.testweb.rabbitmq.web;
/**
* User:Json
* Date: 2024/9/3
**/
public class MqConstant {
public static final String TestDirectRouting = "rabbitmq.TestDirectRouting";
public static final String TestDirectQueue = "rabbitmq.TestDirectQueue";
public static final String TestDirectExchange = "rabbitmq.TestDirectExchange";
}
第四步 消费者定义:
package com.testweb.testweb.rabbitmq.web.consumer;
import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
* User:Json
* Date: 2024/9/3
* 消费者
**/
@Configuration
public class TestConsumer {
//队列
@Bean
public Queue TestDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-ha-policy", "all"); //将队列设置为在集群中的所有节点上都可用
return new Queue(MqConstant.TestDirectQueue, true, false, false, args);
}
@Bean
public DirectExchange TestDirectExchange() {
return new DirectExchange(MqConstant.TestDirectExchange, true, false);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(MqConstant.TestDirectRouting);
}
@RabbitListener(queues = MqConstant.TestDirectQueue)
public void process1(Message testMessage, Channel channel) {
// 消息的唯一标识id
long deliveryTag = testMessage.getMessageProperties().getDeliveryTag();
//重试次数
Integer retryCount =(Integer) testMessage.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);
try {
// 处理消息的业务逻辑
System.out.println("Received order message: " + new String(testMessage.getBody()));
//假装异常
int a= 1/0;
// 手动确认消息
// deliveryTag 唯一标识
// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
if (retryCount < 3) { // 设置最大重试次数
try {
System.out.println("处理失败,拒绝消息并重新入队 :" + testMessage);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("retryCount", retryCount + 1);
Message newMessage = new Message(testMessage.getBody(), messageProperties);
// 重新入队 未写完
channel.basicPublish(MqConstant.TestDirectExchange, MqConstant.TestDirectRouting, null, newMessage.getBody());
// 手动确认原消息,防止死循环
channel.basicAck(deliveryTag, false);
// 处理失败,拒绝消息并重新入队 方式1
// 消息标识 deliveryTag,
// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息,,
// requeue 是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃
// channel.basicNack(deliveryTag, false, true);
// 处理失败,拒绝消息并重新入队 方式2
// 消息标识 deliveryTag
// requeue 是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃。
//channel.basicReject(long deliveryTag, boolean requeue);
// 3. 使用场景
// basicNack:
// 当你需要拒绝一批消息时,使用 basicNack 是更好的选择,尤其是当你想在消费失败时批量拒绝多条消息。
// 适用于更复杂的场景,比如一次性处理多个未确认的消息。
// basicReject:
// 当你只想拒绝当前消息时,basicReject 是一个简化的选择。它通常用于更简单的场景,只需处理当前消息即可。
// 适合处理单个消息的拒绝。
//如果你在消费者 里 只写了 消息确定 没有写 如果异常后 的处理 默认是不会把消息重新放回队列的
} catch (Exception nackException) {
System.out.println("重新入队失败!!!");
// 处理 nack 失败的情况
nackException.printStackTrace();
}
} else {
System.out.println("达到最大重试次数 将消息发送到死信队列或进行其他处理!!!");
try {
channel.basicReject(deliveryTag, false); // 丢弃消息或转发到死信队列
} catch (Exception rejectException) {
rejectException.printStackTrace();
}
}
}
}
}
第五步:生产者
package com.testweb.testweb.rabbitmq.web.producer;
import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* User:Json
* Date: 2024/9/3
* 生产者
**/
@Component
public class TestProducer {
@Resource
AmqpTemplate amqpTemplate;
public <T> void produce(T payload){
amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,
MqConstant.TestDirectRouting, payload);
}
}
第六步 测试:
package com.testweb.testweb.rabbitmq.web.controller;
import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* User:Json
* Date: 2024/9/3
**/
@RestController
@RequestMapping("/testMq")
public class TestMqController {
@Resource
AmqpTemplate amqpTemplate;
@GetMapping("test")
@CrossOrigin(origins = "*")
public void test(@RequestParam String msg){
amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,
MqConstant.TestDirectRouting, msg);
}
}