幂等性解决方案(消息队列例子)
什么是幂等性
幂等性(Idempotency)指的是在进行某一操作多次时,结果与执行一次的结果是相同的,即不管操作被执行多少次,只要输入相同,结果就不变。在计算机科学中,幂等性通常用来描述一个操作的“重复执行”不会产生副作用。
所以可以利用幂等性来解决重新消费问题
我们在消息队列 例如kafka,rabbitmq中经常使用
使用场景
1.数据库操作(尤其是插入、更新)
这里很简单,直接使用事务保证幂等性
2.分布式事务
分布式锁
3.消息队列(Kafka、RabbitMQ等)
下面详细讲解
4.外部API调用
由于网络延迟,可能重复调用,需要外部API保证幂等性
例子
kafka生产者幂等性解决重复消费
当生产者生产数据到kafka中进行存储时候,可能由于kafka服务的网络波动,重启等因素,导致ack没有应答,这里kafka组件中的数据收集会有重试机制导致重新发送,这样就可能导致kafka中磁盘存储的数据重复
解决:kafka这里帮我们实现了幂等性处理
开启下面的参数
1.ACKS=-1
2.开启重试
3.在途请求缓冲区的数量不能大于5 默认=5
4.开启幂等性(就是为每个消息添加生产id和序列号,最好会比较)
kafka消费者利用幂等性解决重复消费
为什么会出现重复消费,例如
1.消费者拉取kafka中的数据
2.然后经过业务处理,存入数据库
3.提交偏移量
如果在步骤2,3之间kafka服务由于某些原因挂掉,导致偏移量未能提交,导致下次拉取数据依旧是之前的数据,这样数据就重复了
方案一:使用数据库判断
每次入库时候判断数据库中是否存在该数据
这样能解决重复问题,但是会占用数据库资源,以及接口响应速度
方案二:redis缓存
设置一个短期缓存,每次入库前判断缓存中是否有该数据
redis基于内存,响应速度很快
方案三:分布式锁
例如有多个消费者获取kafka中的数据,可以使用分布式锁例如redisson锁住这个消息id,其他消费获取不到该锁则直接跳过来拉取下一条消息
但是通常 Kafka 本身就有保证消息的顺序消费和消息的幂等性机制。因此,在大多数情况下,使用 Redisson 分布式锁来确保每个消息只被一个消费者处理是多余的
因为kafka中一个分区只能被 一个消费者消费。
所以这样消费者就保证了消费不会被重复消费,只有在提交偏移量错误的时候可能导致重复消费
rabbitMQ生产者消费可靠性问题(额外)
当生产者发送消息到Exchange-》queue之后可能出现网络波动等因素,导致消息没有应答,获取消息根本没有发送到rabbitmq中去
方案 try-catch+冗余表
声明rabbitmq的回调函数,是否正确响应,如果没有则放入冗余表(存放为正确进入queue中的数据),然后用定时任务重新执行冗余表
rabbitmq消费者可靠问题
一般我们会基于try-catch+手动ack+死信队列来保证消息的可靠
例如当我们的消费者为正常消费到数据,报错了,这时候rabbitmq会干啥,会一直重新发送数据,导致内存崩溃
这时候就需要关闭重试机制,手动ack,然后让未能正确处理的数据进入死信队列,然后又后续的程序处理死信队列的数据
rabbitmq消费者重复消费问题
那么这里有个问题,当我们正常的处理数据的queue会讲数据存入数据库,死信队列也可能会存入数据库,这里的数据是相同的,就会出现数据重复的问题
然后解决方法其实和kafka的解决方案一样
1.使用数据库唯一id
2.redis缓存
3.分布式锁
总结
幂等性广泛应用于分布式系统中,特别是在数据库操作、支付系统、消息队列、外部 API 调用、日志记录等场景中,它确保了即使在操作被重复执行时,系统的最终结果仍然是一致的。