如何实现接口调用的重试
引言
在控制设备的时候,常规的方式是利用同步控制的方式,当下发控制后,需等待控制结果;该方式常用于设备数量较少,或者网络带宽不拥堵的情况。但随之会造成主线程阻塞,在大量控制中可能出现批量失败的情况,造成严重损失。因此,在控制流量很大的情况下,常采用异步控制的方式。异步方式将控制下发和结果回调进行拆分,所以主线程不会受到影响,并且能被动知道控制的结果,服务能力得到很大提升。
实现异步的方式有很多种:
JDK
方式,这个可以参考我之前的文章,实现异步控制。- 采用中间件-
RocketMQ
实现,通过生产者-消费者模式来实现设备异步控制,具体的控制原理可以参考我之前的文章:物联网中RocketMQ的使用
如何利用RocketMQ实现回调重试
如上图所示,是用RocketMQ
实现的异步控制。其中有个结果回调,用于告知主线程此时的控制结果。在实际回调时是通过http
请求上游服务,所以也会存在上游服务结点宕机,或者网络超时。因此为了提高回调的可用性,在回调的时候会尝试重试请求,保证结果的一致性。
既然在控制设备的时候使用了RocketMQ
,那在回调的时候也可以使用RocketMQ
的失败重试机制。因此,在上图中的回调,就不能直接告知控制结果,而是将控制的参数放入RocketMQ
中,利用RocketMQ
的特性来实现失败重试。
那种代码中如何实现呢?
对于如何将回调参数塞入RocketMQ
中,可以看之前的文章:物联网中RocketMQ的使用。
接下来就写一下,失败重试:
@RocketMQMessageListener(
topic = TOPIC_DEMO,
consumerGroup = "consumer_demo_group"
)
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
private final String CHARSET = Charset.defaultCharset().name();
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String str = new String(body, Charset.forName(this.CHARSET));
System.out.println("消费者消费的消息为: " + str);
try{
//todo http 调用,用于回调告知控制结果
}catch(Exception e){
log.error("结果回调失败")
throw e;
}
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 指定消费线程大小
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
// 指定消费的最大次数 2次
defaultMQPushConsumer.setMaxReconsumeTimes(2);
}
}
在onMessage
方法中,将http
调用方法放入try...catch
中,当发生错误,进行抛出,RocketMQ
就能抓住错误进行重试。
在prepareStart
方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
- 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
- 最大的重复消费此时:
defaultMQPushConsumer.setMaxReconsumeTimes(2);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
结尾
以上就是利用RocketMQ
实现重试的思路。长路漫漫浩、慢慢克服!