深入了解Spring重试组件spring-retry
在我们的项目中,为了提高程序的健壮性,很多时候都需要有重试机制进行兜底,最多就场景就比如调用远程的服务,调用中间件服务等,因为网络是不稳定的,所以在进行远程调用的时候偶尔会产生超时的异常,所以一般来说我们都会通过手动去写一些重试的代码去进行兜底,而这些重试的代码其实都是模板化的,因此Spring实现了自己的重试机制组件spring-retry,下面我们就一起来学习一下spring-retry这个组件吧
使用方式
1.编程式
// 创建一个RetryTemplate
RetryTemplate retryTemplate = RetryTemplate.builder()
.customPolicy(new SimpleRetryPolicy()) // 指定重试策略,默认重试3次
.exponentialBackoff(1000L, 2, 10000L) // 指定重试的退避时间策略
.withListener(new RetryListenerDemo())// 重试监听器
.build();
// 通过RetryTemplate的execute方法执行业务逻辑
retryTemplate.execute(retryContext -> {
log.info("开始执行");
throw new RuntimeException("抛出异常");
}, context -> recoverMethod());
// 当重试结束还是失败之后最后兜底执行的方法
public String recoverMethod() {
log.info("执行恢复");
return "执行了Recover方法";
}
public class RetryListenerDemo implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
log.info("{}", context.getRetryCount());
log.info("listener>>>开始监听");
// return false; // 否决整个重试
return true; // 继续重试
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("listener>>>关闭");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("listener>>>报错了");
}
}
这里说一下重试监听器,自定义的重试监听器需要实现RetryListener接口,该接口主要包括三个方法:
- open:在执行我们的业务逻辑之前会执行一次open方法,如果该方法返回false,则会直接抛出一个TerminatedRetryException异常,从而不会往下执行业务逻辑,返回true则正常往下执行
- close:当重试结束之后,或者open方法返回false的时候就会触发close方法
- onError:在每一次业务逻辑抛出异常的时候都会执行onError方法
2.声明式
@Retryable(value = Exception.class, maxAttempts = 3, listeners = {"retryListenerDemo"})
public String test() {
log.info("开始执行");
throw new RuntimeException("抛出异常");
}
@Recover
public String recoverMethod() {
log.info("执行恢复");
return "执行了Recover方法";
}
声明式只需要在需要重试的方法上加上Retryable注解,并且在注解上指定一些重试的属性,比如重试次数,触发重试的异常,重试监听器等等,这些属性对应在编程式中都能进行设置。而对于重试兜底方法则需要Recover注解进行标识
重试策略RetryPolicy
在对重试属性进行配置的时候我们可以去配置不同的重试策略,所谓的重试策略,其实就是去判断是否能够进行重试,也就是RetryPolicy,它是一个接口
public interface RetryPolicy extends Serializable {
/**
* 是否能够重试
*
* @param context 重试上下文
* @return true=>允许重试,false=>不允许重试
*/
boolean canRetry(RetryContext context);
/**
* 获取一个重试上下文,不同的重试策略有自己的重试上下文
*
* @param parent 父级重试上下文
* @return a {@link RetryContext} object specific to this policy.
*
*/
RetryContext open(RetryContext parent);
/**
* 关闭这个重试上下文
*/
void close(RetryContext context);
/**
* 每一次重试失败后会回调该方法,然后通过重试上下文记录下重试的异常,方便在下一次canRetry方法中从重试上下文中去判断是否还能进行重试
* @param context 重试上下文
* @param throwable 重试时抛出的异常
*/
void registerThrowable(RetryContext context, Throwable throwable);
}
该接口在spring-retry中提供多种不同的重试策略的实现
- SimpleRetryPolicy:这是一种简单的重试策略,允许根据最大重试次数和特定的异常列表来控制重试行为
- NeverRetryPolicy:不进行重试的重试策略,也就是说我们的业务逻辑代码在第一次执行如果抛出异常了,不会进行重试
- AlwaysRetryPolicy:允许一直重试的重试策略
- TimeoutRetryPolicy:通过设置重试的时间段,仅允许在未超过该时间段的时候才进行重试
- CompositeRetryPolicy:组合重试策略,可以组合多种重试策略,这对于需要复杂条件的情况非常有用
- ExpressionRetryPolicy:该策略继承了SimpleRetryPolicy,在SimpleRetryPolicy的基础上加上了基于spel表达式去判断是否需要进行重试的功能
在RetryPolicy接口中关键的方法就是canRetry,canRetry方法会在重试之前进行调用,用来判断是否还能够继续进行重试,而判断所需的一些上下文属性(例如已经重试的次数,重试的超时时间)就在重试上下文RetryContext中,对于每一种重试策略来说,都会有自己的RetryContext,因为不同的重试策略它们用来判断重试机会的时候所需的上下文属性是不一样的
以TimeoutRetryPolicy为例,它具有限制重试时间的功能,那自然就需要记录下重试的起始时间和重试的超时时间了,而这两个信息就会放在TimeoutRetryContext中
private static class TimeoutRetryContext extends RetryContextSupport {
/**
* 允许重试的时间段
*/
private long timeout;
/**
* 重试开始时间
*/
private long start;
public TimeoutRetryContext(RetryContext parent, long timeout) {
super(parent);
this.start = System.currentTimeMillis();
this.timeout = timeout;
}
/**
* 判断当前是否超过了重试时间
* @return true=>允许重试,false=>已经超过了重试时间了,不允许重试
*/
public boolean isAlive() {
return (System.currentTimeMillis() - start) <= timeout;
}
}
这样就可以在下一次判断是否能够重试的时候,也就是调用canRetry方法的时候通过传入TimeoutRetryContext去判断重试是否超时了
退避策略BackOffPolicy
上面说的RetryPolicy主要是在每一次要重试之前用来判断是否能够进行重试的,而BackOffPolicy则是提供了重试之间的间隔时间多久的功能,也就是说会先去执行RetryPolicy判断是否允许重试,如果允许重试,则才会去执行BackOffPolicy去等待重试的执行
public interface BackOffPolicy {
/**
* 创建一个退避上下文
*
* @param context the {@link RetryContext} context, which might contain information
* that we can use to decide how to proceed.
* @return the implementation-specific {@link BackOffContext} or '<code>null</code>'.
*/
BackOffContext start(RetryContext context);
/**
* 执行退避操作
* @param backOffContext the {@link BackOffContext}
* @throws BackOffInterruptedException if the attempt at back off is interrupted.
*/
void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;
}
spring-retry也提供了不同的BackOffPolicy实现
- NoBackOffPolicy:一个不执行任何操作的 BackOffPolicy,即不会增加等待时间。适用于不需要等待时间间隔的情况
- FixedBackOffPolicy:以固定时间去进行重试退避
- ExponentialBackOffPolicy:退避时间以指数形式增长
执行流程
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
// 获取当前的重试上下文
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// 把当前的重试上下文设置到ThreadLocal中
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// 遍历所有的重试监听器,执行其open方法
boolean running = doOpenInterceptors(retryCallback, context);
// 条件成立:有其中一个重试监听器的open方法返回了false
if (!running) {
// 抛出异常
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// Get or Start the backoff context...
BackOffContext backOffContext = null;
// 尝试从当前的重试上下文中获取退避上下文
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
// 条件成立:说明当前的重试上下文中没有设置退避上下文
if (backOffContext == null) {
// 这时候通过退避策略创建出对应的退避上下文
backOffContext = backOffPolicy.start(context);
// 再把这个退避上下文放到重试上下文中
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
// 条件成立:当前配置的重试策略允许重试,并且当前的重试上下文中没有设置中断重试的标志
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
// Reset the last exception, so if we are successful
// the close interceptors will not think we failed...
lastException = null;
// 执行retryCallback,也就是执行目标重试方法
return retryCallback.doWithRetry(context);
}
// 执行目标重试方法时抛异常了
catch (Throwable e) {
lastException = e;
try {
// 此时在重试上下文中记录下重试异常
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
}
finally {
// 遍历所有的重试监听器,执行其onError方法
doOnErrorInterceptors(retryCallback, context, e);
}
// 在执行退避策略之前再判断一下是否还能重试
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 执行退避策略
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
}
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
/*
* A stateful attempt that can retry may rethrow the exception before now,
* but if we get this far in a stateful retry there's a reason for it,
* like a circuit breaker or a rollback classifier.
*/
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
// 代码执行到这里说明重试结束了
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
// 重试结束之后,最后执行recover方法,并返回recover方法的执行结果
return handleRetryExhausted(recoveryCallback, context, state);
}
// 上面try中抛出异常之后catch
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
// 执行所有重试监听器的close方法
doCloseInterceptors(retryCallback, context, lastException);
// 在ThreadLocal中清除当前的重试上下文,如有必要,还需把父级上下文设置回ThreadLocal中
RetrySynchronizationManager.clear();
}
}
上面就是执行重试的核心流程代码,注释都详细写上去了,就不多说了。这里有个说一下的就是如果存在嵌套重试的话,我们需要去保存父层级的RetryContext,什么叫嵌套重试?就是在一个重试方法中调用了另一个重试方法,这两个重试方法的重试规则可能都不一样,这时候在执行第二个重试方法的时候就需要把第一个重试方法的RetryContext进行保存,那spring-retry是怎么保存的呢?在RetryContext中会有一个parent,这个parent记录的就是当前上一层的RetryContext,而当第二层重试执行完之后,这时候就会返回上一层的重试,所以就需要把上一层的RetryContext复原,这个复原的动作会在上面最后的finally代码块中执行。关联父子RetryContext的操作会在RetryPolicy的open方法中去执行,传入的参数就是父级的RetryContext
/**
* 获取一个重试上下文,不同的重试策略有自己的重试上下文
*
* @param parent 父级重试上下文
* @return a {@link RetryContext} object specific to this policy.
*
*/
RetryContext open(RetryContext parent);