当前位置: 首页 > article >正文

RabbitMQ消息的可靠性

RabbitMQ消息的可靠性

一 生产者的可靠性

  1. 生产者重试
    有时候由于网络问题,会出现连接MQ失败的情况,可以配置重连机制
    注意:SpringAMQP的重试机制是阻塞式的,重试等待的时候,当前线程会等待。
spring:
	rabbitmq:
		connection-timout: 1s #设置MQ的连接超时时间
		templete:
			retry:
				enabled: true #开启超时重试机制
				initial-interval: 100ms #失败后的初始等待时间
				multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipier
				max-attempts: 3 #最大重试次数
  1. 生产者确认

    (1)在生产者服务的yaml文件中配置一下内容
spring:
	rabbitmq:
		publisher-confirm-type: correlated #开启publisher confirm机制,并设置为MQ异步回调方式返回回执信息
		publisher-returns: true #开启publisher return机制

(2)配置return-callback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContexAware{
	@Override
	public void setApplicationContext(ApplicationContent applicationContext){
		// 获取MQ
		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
		// 设置returnCallback
		rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{
	log.info("消息发送失败,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());
});
	}
}

(3)发送消息,指定消息ID,消息的ConfirmCallback
相比于发布消息,多了消息的confirm

@Test
public void testPubliserConfir()throw InterupteDException{
	// 创建correlationData
	CorrelationData cd = new CorrelationData(UUID.randowUUID().toString());
	// 给Future添加ConfirmCallback
	cd.getFuture().addCallback(new ListenableFutureCllback<CorelationData.Confirm>(){
	@Override
	public void onFailure(Throwable ex){
		// Future发生异常时的处理逻辑,一般不触发
		log.error("handle message ack fail",ex);
	}
	@Override
	public void onSuccess(CorrelationData.Confirm result){
		// Future接收到回执的处理逻辑
		if(result.isAck()){
			log.debug("发送消息成功,收到ACK");
		}else{
			log.error("发送消息失败,收到NACK,reason:{}",result.getReson());
		}
	}
});
// 发送消息
rabbitTemplate.coverAndSend("hmall.direct","red","hello",cd);
}

二 MQ的可靠性

MQ的持久化可以使用Lazy Queue
(1)通过配置类

@Bean
public Queue lazyQueue(){
	return QueueBuilder.durable("lazy.queue")// 队列名称
	.lazy()//开启lazy
	bulid();
}

(1)基于注解

@RabbitListener(queuesToDeclare = @Queue(
	name="lazy.queue",
	durable="ture",
	arguments=@Argument(name="x-queue-mode",value="lazy")
))
public void listenLazyQueue(String msg){
	log.info("接收到 lazy.queue的消息:{}",msg);
}

三 消费者确认

  1. 消费者确认机制
    在这里插入图片描述
    可以通过配置来进行确认
spring:
	rabbitmq:
		listenner:
			simple:
				prefetch: 1
				acknowledgs-mode: auto #确认机制 none-关闭ack,manual-手动ack,auto-自动
  1. 消费失败处理
    重试机制
spring:
	rabbitmq:
		listenner:
			simple:
				prefetch: 1
				retry:
					enabled: true #开启超时重试机制
					initial-interval: 100ms #失败后的初始等待时间
					multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipier
					max-attempts: 3 #最大重试次数
					stateless: true #true为无状态,若业务包含事务,则使用false
					

失败处理策略
在这里插入图片描述
在这里插入图片描述
代码实现

@Slf4j
@Configureation
@ConditionalOnProperty(prefix="spring.rabbitmq.listenner.simple.retry",name="enable",havingValue="true")// 只有重试机制是true才生效
public class ErrorConfiguration{
	@Bbean
	public DirectExchange errorExchange(){
		return new DirectExchange("error.direct");
	}
	@Bean
	public Queue errorQueue(){
		return new Queue("error.queue");
	}
	@Bean
	public Binding errorBinding(DirectExchange errorExchange,Queue errorQueue){
		return BindingBuilder.bind(errorQueue).to(errorExchange).with("eooro");
	}
	/**
	* 重试失败处理策略
	* RepublishMessageRecoverer:重试失败后,将消息发送到指定的队列中
	*/
	@Bean
	public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
		log.info("MessageRecoverer 重试失败处理策略配置");
		return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
	}
}

四 业务幂等性

  1. 消息唯一id
  2. 业务判断
    在这里插入图片描述

http://www.kler.cn/news/134221.html

相关文章:

  • 单元测试实战(三)JPA 的测试
  • 【c++随笔13】多态
  • 力扣174. 寻找二叉搜索树中的目标节点(java,二叉搜索树的性质的运用)
  • vscode 创建 运行c++ 项目
  • 一文了解Word2vec 阐述训练流程
  • 第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份
  • VBA之Word应用:文档(Document)的书签
  • 【Python数据结构与算法】——(线性结构)精选好题分享,不挂科必看系列
  • 函数式编程框架 functionaljava 简介
  • ClickHouse数据一致性
  • 电子学会C/C++编程等级考试2022年03月(一级)真题解析
  • linux文件IO
  • CentOS to 浪潮信息 KeyarchOS 迁移体验与优化建议
  • 【算法】二分查找-20231120
  • Leetcode—2760.最长奇偶子数组【简单】
  • ubuntu 20通过docker安装onlyoffice,并配置https访问
  • 基于Qt QList和QMap容器类示例
  • 关于缓存和数据库一致性问题的深入研究
  • 容斥 C. Strange Function改编题
  • 使用宝塔面板安装mysql
  • AI机器学习 | 基于librosa库和使用scikit-learn库中的分类器进行语音识别
  • Canal+Kafka实现MySQL与Redis数据同步(一)
  • 【洛谷 P1182】数列分段 Section II 题解(二分答案+递归)
  • 【论文解读】GPT Understands, Too
  • 阿里云ECS11月销量王 99元/年
  • <b><strong>,<i><em>标签的区别
  • 什么是美国服务器,有哪些优势,适用于什么场景?
  • 分库分表
  • 提高视频性能的 5 种方法
  • Ubuntu 20.04 LTS ffmpeg gif mp4 互转 许编译安装ffmpeg ;解决gif转mp4转换后无法播放问题