TimeLimiter

简介

与Hystrix不同,Resilience4j将超时控制器从熔断器中独立出来,成为了一个单独的组件,主要的作用就是对方法调用进行超时控制。实现的原理和Hystrix相似,都是通过调用Future的get方法来进行超时控制。

可配置参数
配置参数 默认值 描述
timeoutDuration 1(s) 超时时间限定
cancelRunningFuture true 当超时时是否关闭取消线程
测试前准备
首先引入maven依赖:
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.6.1</version>
</dependency>

resilience4j-spring-boot2 集成了circuitbeaker、retry、bulkhead、ratelimiter、timelimiter 几个模块。

application.yml 配置
resilience4j:
  timelimiter:
    configs:
      default:
        timeout-duration: 3s # 超时时长
        cancel-running-future: true # 发生异常是否关闭线程
    instances:
      backendA:
        base-config: default
        timeout-duration: 2s
      backendB:
        base-config: default
        cancel-running-future: false
调用方法

还是以之前查询用户列表的后端服务为例。TimeLimiter目前仅支持程序式调用,还不能使用AOP的方式调用。

因为TimeLimiter通常与CircuitBreaker联合使用,很少单独使用,所以直接介绍联合使用的步骤。

TimeLimiter没有注册器,所以通过@Autowired注解自动注入依赖直接使用,因为TimeLimter是基于Future的get方法的,所以需要创建线程池,然后通过线程池的submit方法获取Future对象:

public class CircuitBreakerServiceImpl {

    @Autowired
    private RemoteServiceConnector remoteServiceConnector;

    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;

    @Autowired
    private TimeLimiter timeLimiter;

    public List<User> circuitBreakerTimeLimiter(){
        // 通过注册器获取熔断器的实例
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        CircuitBreakerUtil.getCircuitBreakerStatus("执行开始前:", circuitBreaker);
        // 创建单线程的线程池
        ExecutorService pool = Executors.newSingleThreadExecutor();
        //将被保护方法包装为能够返回Future的supplier函数
        Supplier<Future<List<User>>> futureSupplier = () -> pool.submit(remoteServiceConnector::process);
        // 先用限时器包装,再用熔断器包装
        Callable<List<User>> restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
        Callable<List<User>> chainedCallable = CircuitBreaker.decorateCallable(circuitBreaker, restrictedCall);
        // 使用Try.of().recover()调用并进行降级处理
        Try<List<User>> result = Try.of(chainedCallable::call)
            .recover(CallNotPermittedException.class, throwable ->{
                log.info("熔断器已经打开,拒绝访问被保护方法~");
                CircuitBreakerUtil.getCircuitBreakerStatus("熔断器打开中", circuitBreaker);
                List<User> users = new ArrayList();
                return users;
            })
            .recover(throwable -> {
                log.info(throwable.getLocalizedMessage() + ",方法被降级了~~");
                CircuitBreakerUtil.getCircuitBreakerStatus("降级方法中:",circuitBreaker);
                List<User> users = new ArrayList();
                return users;
            });
        CircuitBreakerUtil.getCircuitBreakerStatus("执行结束后:", circuitBreaker);
        return result.get();
    }
}
使用测试

异常A和B在application.yml文件中没有修改:

recordExceptions: # 记录的异常
    - com.example.resilience4j.exceptions.BusinessBException
    - com.example.resilience4j.exceptions.BusinessAException
ignoreExceptions: # 忽略的异常
    - com.example.resilience4j.exceptions.BusinessAException

使用另一个远程服务接口的实现,将num%4==3的情况让线程休眠5s,大于我们TimeLimiter的限制时间:

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        if (num % 4 == 1){
            throw new BusinessAException("异常A,不需要被记录");
        }
        if (num % 4 == 2){
            throw new BusinessBException("异常B,需要被记录");
        }
        if (num % 4 == 3){
            Thread.sleep(5000);
        }
        log.info("服务正常运行,获取用户列表");
        // 模拟数据库的正常查询
        return repository.findAll();
    }
}

把调用方法进行单元测试,循环10遍:

public class CircuitBreakerServiceImplTest{

    @Autowired
    private CircuitBreakerServiceImpl circuitService;

    @Test
    public void circuitBreakerTimeLimiterTest() {
        for (int i=0; i<10; i++){
            circuitService.circuitBreakerTimeLimiter();
        }
    }
}

看下运行结果:

执行开始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 0
服务正常运行,获取用户列表
执行结束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行开始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 1
com.example.resilience4j.exceptions.BusinessAException: 异常A,不需要被记录,方法被降级了~~
降级方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行结束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行开始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 2
com.example.resilience4j.exceptions.BusinessBException: 异常B,需要被记录,方法被降级了~~
降级方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行结束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行开始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 3
null,方法被降级了~~
降级方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行结束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]

发现熔断器任何异常和超时都没有失败。。完全不会触发熔断,这是为什么呢?我们把异常toString()看一下:

java.util.concurrent.ExecutionException: com.example.resilience4j.exceptions.BusinessBException: 异常B,需要被记录,方法被降级了~~
java.util.concurrent.TimeoutException,方法被降级了~~

这下原因就很明显了,线程池会将线程中的任何异常包装为ExecutionException,而熔断器没有把异常解包,由于我们设置了黑名单,而熔断器又没有找到黑名单上的异常,所以失效了。这是一个已知的bug,会在下个版本(0.16.0之后)中修正,目前来说如果需要同时使用TimeLimiter和CircuitBreaker的话,黑白名单的设置是不起作用的,需要自定义自己的谓词逻辑,并在test()方法中将异常解包进行判断,比如像下面这样:

public class RecordFailurePredicate implements Predicate<Throwable> {

    @Override
    public boolean test(Throwable throwable) {
        if (throwable.getCause() instanceof BusinessAException) return false;
        else return true;
    }
}

然后在application.yml文件中指定这个类作为判断类:

circuitbreaker:
    configs:
      default:
        recordFailurePredicate: com.example.resilience4j.predicate.RecordFailurePredicate

就能自定义自己的黑白名单了,我们再运行一次试试:

java.util.concurrent.TimeoutException,方法被降级了~~
降级方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
执行结束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]

可以看出,TimeLimiter已经生效了,同时CircuitBreaker也正常工作。
Note:

最新版0.17.0,该bug已经修复,黑白名单可以正常使用。

作者:I讨厌鬼I
链接:https://www.jianshu.com/p/5531b66b777a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

作者:Jeebiz  创建时间:2020-11-04 18:37
 更新时间:2024-10-26 16:30