RabbitMQ基本使用以及整合Java项目
RabbitMQ安装
此步骤可以参考CSDN上其他博文,有写得很详细的,此处不做过多安装问题,主要讲述怎么使用。
项目整合
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
配置好yml文件
rabbitmq:
host: 192.168.85.xxx #自己的地址
port: 5673 # linux 对外开放端口是5673:5672
username: admin
password: admin
virtual-host: /
基本使用
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ImgApp.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMq(){
//队列名称
String que="test.queue";
//消息
String msg="hello,test";
rabbitTemplate.convertAndSend(que,msg);
}
}
@Component
public class MqListener {
@RabbitListener(queues = "test.queue")
public void listenerQueue(String msg){
System.out.println("消费者收到消息:"+msg);
}
}
可以看到跨服务接受到消息
MQ的三种交换机
- Direct 交换机:
-
- Direct 交换机是最简单的一种交换机类型。它将消息路由到绑定键(Binding Key)与消息的路由键(Routing Key)完全匹配的队列中。如果绑定键与消息的路由键相匹配,消息将被路由到对应的队列中。
- Direct 交换机通常用于一对一的消息传递模式,适合处理特定类型的消息。
- Fanout 交换机:
-
- Fanout 交换机会将接收到的消息广播到所有与之绑定的队列中,忽略消息的路由键。即 Fanout 交换机会将消息发送到所有与其绑定的队列,不管队列的绑定键是什么。
- Fanout 交换机适用于广播消息给多个消费者的场景,每个消费者都会收到相同的消息。
- Topic 交换机:
-
- Topic 交换机根据消息的路由键和队列的绑定键之间的模式匹配规则,将消息路由到一个或多个队列中。Topic 交换机支持通配符匹配,可以根据路由键的模式进行灵活的匹配。
- Topic 交换机适用于灵活的消息路由场景,可以根据消息的内容进行多种模式匹配,实现更精细的消息路由控制。
- * (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
配置消息转换器
原因:
引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.5</version>
</dependency>
当映入jackson依赖后其spring的ampq中将会自动使用
在生产者和消费者是启动类上注入Bean
消费者端使用注解方式声明交换机以及队列
//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue"),
exchange = @Exchange(name = "xiaomi.topic",type = ExchangeTypes.TOPIC),
key = "test.#"
))
生产者确认机制
开启生产者确认机制后,MQ的性能下降明显,一般场景不建议开启。
保证消息可靠性
LazyQueue
即将消息持久化直接写入磁盘,而不是从放在内存中。
消费者确认机制
一般情况下为了保证消息不被丢失都是:nack
失败重试机制
当消费者出现异常后,为了避免消息不断重新入队后又重新发送,造成死循环导致性能压力飙升,所以又消息失败重试机制。
当重试耗尽后的处理方式
创建失败处理的Config
/** MQ的配置
* @author 12547
* @version 1.0
* @Date 2024/3/17 16:28
*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true")
public class ErrorConfig {
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
死信交换机
死信交换机如以上案例所示,用作于某些无法处理的消息的兜底方案,还可以用做延迟消息(思路上可以,但不推荐)
延迟消息
官方延迟插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
使用docker logs rabbit 查看MQ版本
找到对应版本并下载
进入 cd /root/目录下
将下载好的插件上传至该目录下
docker cp /root/rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins
执行后进入docker容器 cd plugins 查看是否安装成功
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启容器后查看管理页面在选项栏中看到则代表成功
SpringBoot整合延时消息
在注解中的@Exchange中额外加入delayed="true";
发送消息
生产者测试代码:
//测试延时消息
@Test
public void testSendDelayMapMsg(){
//队列名称
//消息
String msg="hello,delay";
rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//添加延时消息属性
message.getMessageProperties().setDelay(5000); //设置延时时间5s
return message;
}
});
System.out.println("消息发送成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
}
消费者测试代码:
//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenerDelay(String msg){
System.out.println("消息接受成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
System.out.println("收到延时消息:"+msg);
}
测试延时结果
触发失败消息策略
在消费者端期初由于LocalDateTime类使用错误(用了LocalTime),而Formate类是DateTime,导致无法转换,而生产者端会直接报错。生产者端改正后忘记修改消费者端的这部分代码(复制过去的),生产者测试代码能够正常运行,而消费者端这部分发现报错,但IDEA不会出现报错提示,此处应该是我的代码出现异常,所以导致消费者无法正常消费,走异常处理后进入失败重试,重试结束后走消息失败策略,进入定义的error交换机与队列。
异常后进入error.queue
延时消息经典场景
支付超时取消订单
有关延时消息最常用的一个业务就是支付超时取消,即防止用户下订后一直不支付,通过延时消息来达到取消订单的功能。
以订单30分钟超时时间为例,如果用户创建订单后进入消息队列直接设置30分钟延时消息后,那么这个消息要30分钟后才会给消费者判断,而这30分钟则一直堆积在队列中,但绝大多数情况用户支付基本都会在1分钟内完成,只有极少数情况才会出现超时,所以方案就是将30分钟的超时时间拆分为一个一个分段的延时时间1s,5s,10s,30s......这样子就避免了消息堆积,浪费性能
自定义一个延时消息体类 用于封账需要发送的消息和延迟时间
/**延迟消息实体类
* @author 12547
* @version 1.0
* @Date 2024/3/17 22:02
*/
@Data
public class MultiDelayMessage<T> {
//消息体
private T data;
//延时消息数组
private List<Long> delayMils;
public MultiDelayMessage(T data,List<Long> delayMils){
this.data=data;
this.delayMils=delayMils;
}
public static <T> MultiDelayMessage<T> of(T data, Long ...delayMils){
return new MultiDelayMessage<>(data, CollUtil.newArrayList(delayMils));
}
//获取延迟队列的下一个延迟时间并返回
public Long removeNextDelay(){
return delayMils.remove(0); //从延迟时间列表中移除并返回下一个延迟时间
}
}
为了避免发送延迟消息时都要创建一个新消息体对象,所以手动创建一个类
自定义一个DelayPostMessage实现接口MessagePostProcessor的postProcessMessage方法,然后在方法中创建一个delay的成员变量,即通过构造函数的方式传入delay。@RequiredArgsConstructor 注解可以用于类上,用于自动生成一个包含所有被 final 修饰的成员变量
自定义DelayPostMessage:
@RequiredArgsConstructor
public class DelayPostMessage implements MessagePostProcessor {
private final int delay;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
那么使用就改为了
rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new DelayPostMessage(msg.removeNextDelay().intValue()));-