如何避免消息的重复消费问题?(消息消费时的幂等性)
如何避免消息的重复消费问题
- 1、 消息的幂等性
- 1.1、概念
- 1.2、产生业务场景
- 2、全局唯一ID+Redis解决消息幂等性问题
- 2.1、application.yml配置文件
- 2.2、生产者发送消息
- 2.3、消费者接收消息
- 2.4、pom.xml引入依赖
- 2.5、RabbitConfig配置类
- 2.6、启动类
- 2.7、订单对象
- 2.8、测试
1、 消息的幂等性
https://blog.csdn.net/weixin_63267801/article/details/134211065
1.1、概念
消息的幂等性:就是即使多次收到了消息,也不会重复消费。
对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响。
1.2、产生业务场景
同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了。
2、全局唯一ID+Redis解决消息幂等性问题
2.1、application.yml配置文件
配置rabbitmq和redis
server:
port: 8080
spring:
application:
name: rabbit_13_idempotent01_redis
rabbitmq:
host: 你的rabbitmq服务器IP
port: 5672
username: 你的rabbitmq服务管理员账号
password: 你的rabbitmq服务管理员密码
virtual-host: power
publisher-confirm-type: correlated #开启交换机的确认模式
publisher-returns: true
listener:
simple:
acknowledge-mode: manual #开启消费者的手动确认模式
redis:
host: 你的redis服务器IP
port: 你的redis服务端口
password: 你的redis服务密码
database: 1 #1号数据库
my:
exchangeName: exchange.idempotent.01
queueName: queue.idempotent.01
2.2、生产者发送消息
生产者模拟发送两笔相同的订单:
使用com.fasterxml.jackson.databind.ObjectMapper对象进行数据序列化与反序列化:
package com.power.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;
@Service
@Slf4j
public class SendMessage {
@Resource
private RabbitTemplate rabbitTemplate;
//这个对象可以进行序列化和反序列化(json格式)
@Resource
private ObjectMapper objectMapper;
//构造方法执行后执行
@PostConstruct
public void init(){
//开启生产者的确认模式
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
//如果交换机接收消息成功,ack返回true
if(!ack){
log.error("消息没有到达交换机,原因是:{}",cause);
//TODO 重发消息或者记录错误日志
}
});
}
@Bean
public void sendMsg() throws JsonProcessingException {
{
//发送第一笔订单消息
Orders orders1 = Orders.builder()
.orderId("order_100")
.orderName("手机")
.money(new BigDecimal("2345"))
.orderTime(new Date())
.build();
//将对象转换成json
log.info("orders1:::::" + orders1.toString());
String strOrders1 = objectMapper.writeValueAsString(orders1);
log.info("strOrders1:::::" + strOrders1);
MessageProperties messageProperties = new MessageProperties();
//设置单条消息持久化,默认技术持久化的
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders1.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);
}
{
//发送第二笔订单消息
Orders orders2 = Orders.builder()
.orderId("order_100")
.orderName("手机")
.money(new BigDecimal("2345"))
.orderTime(new Date())
.build();
String strOrders2 = objectMapper.writeValueAsString(orders2);
MessageProperties messageProperties = new MessageProperties();
//设置单条消息持久化,默认技术持久化的
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders2.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);
}
log.info("消息发送完毕,发送时间是:"+new Date());
}
}
2.3、消费者接收消息
package com.power.message;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
@Slf4j
public class ReceiveMessage {
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = {"queue.idempotent.01"})
public void receiveMsg(Message message, Channel channel) throws IOException {
//获取消息唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//使用objectMapper把字节数组反序列化成对象
Orders orders = objectMapper.readValue(message.getBody(), Orders.class);
try{
log.info("接收到的消息为:{}",orders.toString());
//如果不存在就在redis中存储
Boolean setResult = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());
if(setResult){
//TODO 向数据插入订单数据
log.info("向数据库插入订单");
}
//手动确认接收消息成功
channel.basicAck(deliveryTag,false);
}catch (Exception e){
log.error("消息处理出现问题");
try {
channel.basicNack(deliveryTag,false,true);
} catch (IOException ex) {
ex.printStackTrace();
}
throw new RuntimeException(e);
}
}
}
2.4、pom.xml引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.power</groupId>
<artifactId>rabbit_13_idempotent01_redis</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rabbit_13_idempotent01_redis</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.5、RabbitConfig配置类
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeName}")
private String exchangeName;
@Value("${my.queueName}")
private String queueName;
//创建交换机
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(exchangeName).build();
}
//创建队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
@Bean
public Binding binding(DirectExchange directExchange,Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
2.6、启动类
package com.power;
import com.power.service.SendMessage;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class Application implements ApplicationRunner {
@Resource
private SendMessage messageService;
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
2.7、订单对象
package com.power.vo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {
private String orderId;
private String orderName;
private BigDecimal money;
private Date orderTime;
}
2.8、测试
启动服务后,生产者发送消息,消费者接收消息,
两笔消息订单ID相同,但是消费者只把接收到的一条消息插入了数据库,实现了消息幂等性。