【谷粒商城之消息队列RabbitMQ】
本笔记内容为尚硅谷谷粒商城消息队列RabbitMQ部分
目录
一、概述
二、简介
三、Docker安装RabbitMQ
四、Springboot整合RabbitMQ
2、application.yml配置
3、测试RabbitMQ
1. AmqpAdmin-管理组件
2.RabbitTemplate-消息发送处理组件
3.RabbitListener&RabbitHandle接收消息
4.RabbitMQ消息确认机制-可靠抵达
5.可靠抵达-Ack消息确认机制
一、概述
二、简介
三、Docker安装RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
- 4369, 25672 (Erlang发现&集群端口)
- 5672, 5671 (AMQP端口)
- 15672 (web管理后台端口)
- 61613, 61614 (STOMP协议端口)
- 1883, 8883 (MQTT协议端口)
Networking and RabbitMQ — RabbitMQ
四、Springboot整合RabbitMQ
1、引入spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml配置
spring.rabbitmq.host=192.168.88.130
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3、测试RabbitMQ
1. AmqpAdmin-管理组件
@Autowired
AmqpAdmin amqpAdmin;
/**
* 1、如何创建Exchange[hello.java.exchange]、Queue、Binding
* 1)、使用AmqpAdmin进行创建
*/
// 创建交换机
@Test
void createExchange() {
//DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
//durable:是否持久化
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("exchange:[{}]创建成功","hello-java-exchange");
}
// 创建队列
@Test
void createQueue() {
//exclusive:是否排他的,false:允许同时有多个连接到此queue
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("queue:[{}]创建成功","hello-java-queue");
}
@Test
void createBinding(){
// 创建绑定
// String destination【目的地】,
// DestinationType destinationType 【目的地类型】
// String exchange【交换机】,
// String routingKey【路由键】,
// Map<String, Object> arguments【自定义参数】
// 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",null);
amqpAdmin.declareBinding(binding);
log.info("binding:[{}]创建成功","hello-java-binding");
}
2.RabbitTemplate-消息发送处理组件
@Test
void sendMessageTests() {
//1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
String msg = "Hello World";
OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
entity.setId(1L);
entity.setCreateTime(new Date());
//2、发送的对象类型的消息,可以是一个json
for (int i = 0;i<10;i++) {
if(i%2==0) {
entity.setName("Vc" + i);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
}else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
}
log.info("消息发送完成{}" + entity);
}
}
使用json格式的序列化器
//使用json格式的序列化器
//否则使用jdk的序列化器
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.RabbitListener&RabbitHandle接收消息
1)必须使用@EnableRabbit
2)监听方法必须放在@Component中
3)@RabbitListener(queues={"hello-java-queue"})放在类上
@RabbitHandler:标在方法上【作用:重载处理不同类型的数据】
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService
/**
* queue:声明需要监听的所有队列
*
* org.springframework.amqp.core.Message
*
* 参考可以写一下类型
* 1、Message message:原生消息详细信息。头+体
* 2、T<发送的消息的类型> OrderReturnReasonEntity content
* 3、Channel channel: 当前传输数据的通道
*
* Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
* 场景:
* 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
* @param message
*/
//RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler
public void recieveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
//{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
System.out.println("接收到消息..."+content);
byte[] body = message.getBody();
//消息头属性信息
MessageProperties properties = message.getMessageProperties();
Thread.sleep(3000);
System.out.println("消息处理完成=》"+content.getName());
}
@RabbitHandler
public void recieveMessage2(OrderEntity content) {
//{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
System.out.println("接收到消息..."+content);
}
}
4.RabbitMQ消息确认机制-可靠抵达
注意:springboot.rabbitmq.publisher-confirm 已被弃用.
#新版使用
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个return
spring.rabbitmq.template.mandatory=true
示例代码:
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
@PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
5.可靠抵达-Ack消息确认机制
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
示例代码:
@RabbitHandler
public void recieveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
//{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
System.out.println("接收到消息..."+content);
byte[] body = message.getBody();
//消息头属性信息
MessageProperties properties = message.getMessageProperties();
Thread.sleep(3000);
System.out.println("消息处理完成=》"+content.getName());
//Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收货物,非批量模式
try {
if(deliveryTag%2==0){
//收货
channel.basicAck(deliveryTag,false);
System.out.println("签收了货物..."+deliveryTag);
}else {
//退货 requeue=false 丢弃 requeue=true 发回服务器,服务器重新入队
//deliveryTag, multiple, requeue(false不再入队)
channel.basicNack(deliveryTag,false,false);
System.out.println("没有签收了货物..."+deliveryTag);
}
} catch (Exception e) {
//网络中断
}
}
结束!