SpringCloud学习记录|day6
学习材料
2024最新SpringCloud微服务开发与实战,java黑马商城项目微服务实战开发(涵盖MybatisPlus、Docker、MQ、ES、Redis高级等)
复习MQ,学过的,应该会轻松一点吧。
RabbitMQ
交换机没有存储功能,必须和队列进行绑定。
1.虚拟主机:数据隔离
Java客户端
1.Work Queues
2.Fanout,Direct,Topic交换机
队列只能发送给一个消费者,有了交换机就可以实现多个消费者同时消费一个消息。
3.如何在java代码中声明队列和交换机
有给定的Queue,Exchange和Binder类或者接口。可以基于注解实现(注解位置是使用消息的地方),或者编写配置类(Config包下)。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
4.消息转化器
放在启动类即可。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
5.动手完成
怎么在common包下定义一个rabbitTemple实例,包含addBeforePublishPostProcessors() 和 addAfterReceivePostProcessors()。
有点难办,目前卡在怎么创建rabbiteTemplate哈哈哈
MQ高级部分
1.发送者的可靠性:发送者连接重试,发送者确认。
发送者重连就是配置那里定义一下就好,启动重试机制,重试时间间隔次数等等。
发送者确认就比较麻烦了。需要配置,还需要指定确认机制。(极少数情况下会发生,但不推荐使用。)
2.MQ的可靠性:数据持久化,LazyQueue。
主要还是MQ宕机不丢失消息最重要。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
3.消费者的可靠性:消费者确认机制,失败重试,失败处理,业务幂等,兜底方案。
消费者确认机制最简单的方法就是在配置文件中配置auto就好了。
本地失败重试。
经过本地失败重试后,该怎么处理,1.reject,直接丢弃,2.nack,消息重新入队,3.失败消息发送到指定交换机。
业务幂等–》消息不重复消费(处理方案:消息ID唯一或者业务逻辑上处理)。
兜底方案就是,业务定时主动去查询数据,以确定一致性。
4.延迟消息
实现方案:
- 死信交换机+TTL
- 延迟消息插件
作业
配置很简单,但是其中的逻辑,helper和消息转换器等等怎么处理不是很懂。需要再深入学习。