RateLimiter

简介

高频控制是可以限制服务调用频率,Resilience4j的RateLimiter可以对频率进行纳秒级别的控制,在每一个周期刷新可以调用的次数,还可以设定线程等待权限的时间。

可配置参数
配置参数 默认值 描述
timeoutDuration 5[s] 线程等待权限的默认等待时间
limitRefreshPeriod 500[ns] 权限刷新的时间,每个周期结束后,RateLimiter将会把权限计数设置为limitForPeriod的值
limiteForPeriod 50 一个限制刷新期间的可用权限数
测试前准备
首先引入maven依赖:
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.6.1</version>
</dependency>

resilience4j-spring-boot2 集成了circuitbeaker、retry、bulkhead、ratelimiter、timelimiter 几个模块。

application.yml 配置
resilience4j:
  ratelimiter:
    configs:
       default:
          timeout-duration: 3s
          limit-for-period: 50
          limit-refresh-period: 500
    instances:
      backendA:
        base-config: default
        timeout-duration: 6s
      backendB:
        base-config: default
        timeout-duration: 6s
用于监控RateLimiter状态及事件的工具类

同样为了监控RateLimiter组件,写一个工具类:

import io.github.resilience4j.ratelimiter.RateLimiter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RateLimiterUtil {

    /**
     * @Description: 获取rateLimiter的状态
     */
    public static void getRateLimiterStatus(String time, RateLimiter rateLimiter){
        RateLimiter.Metrics metrics = rateLimiter.getMetrics();
        // Returns the number of availablePermissions in this duration.
        int availablePermissions =  metrics.getAvailablePermissions();
        // Returns the number of WaitingThreads
        int numberOfWaitingThreads = metrics.getNumberOfWaitingThreads();

        log.info(time  + ", metrics[ availablePermissions=" + availablePermissions +
                ", numberOfWaitingThreads=" + numberOfWaitingThreads + " ]");
    }

    /**
     * @Description: 监听rateLimiter事件
     */
    public static void addRateLimiterListener(RateLimiter rateLimiter){
        rateLimiter.getEventPublisher()
                .onSuccess(event -> log.info(event.toString()))
                .onFailure(event -> log.info(event.toString()));
    }
}

调用方法

还是以之前查询用户列表的服务为例。RateLimiter支持AOP和程序式两种方式的调用。

程序式的调用方法

调用方法都类似,装饰方法之后用Try.of().recover()来执行:

public class RateLimiterServiceImpl {

    @Autowired
    private RemoteServiceConnector remoteServiceConnector;

    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;

    public List<User> ratelimiterNotAOP(){
        // 通过注册器获得RateLimiter实例
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("backendA");
        RateLimiterUtil.getRateLimiterStatus("开始执行前: ", rateLimiter);
        // 通过Try.of().recover()调用装饰后的服务
        Try<List<User>> result = Try.of(
            Bulkhead.decorateCheckedSupplier(rateLimiter, remoteServiceConnector::process))
            .recover(BulkheadFullException.class, throwable -> {
                log.info("服务失败: " + throwable.getLocalizedMessage());
                return new ArrayList();
            });
        RateLimiterUtil.getRateLimiterStatus("执行结束: ", rateLimiter);
        return result.get();
    }
}
AOP式的调用方法

首先在连接器方法上使用@RateLimiter(name=””, fallbackMethod=””)注解,其中name是要使用的RateLimiter实例的名称,fallbackMethod是要使用的降级方法:

public RemoteServiceConnector{

    @RateLimiter(name = "backendA", fallbackMethod = "fallback")
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }

    private List<User> fallback(BulkheadFullException e){
        log.info("服务失败: " + e.getLocalizedMessage());
        return new ArrayList();
    }
} 

如果 Retry、CircuitBreaker、Bulkhead、RateLimiter同时注解在方法上,默认的顺序是Retry>CircuitBreaker>RateLimiter>Bulkhead,即先控制并发再限流然后熔断最后重试

接下来直接调用方法:

public class RateLimiterServiceImpl {

    @Autowired
    private RemoteServiceConnector remoteServiceConnector;

    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;

    public List<User> rateLimiterAOP() throws TimeoutException, InterruptedException {
        List<User> result = remoteServiceConnector.process();
        BulkheadUtil.getBulkheadStatus("执行结束:", rateLimiterRegistry.retry("backendA"));
        return result;
    }
}
使用测试

在application.yml文件中将backenA设定为20s只能处理1个请求,为便于观察,刷新时间设定为20s,等待时间设定为5s:

configs:
      default:
        limitForPeriod: 5
        limitRefreshPeriod: 20s
        timeoutDuration: 5s
    instances:
      backendA:
        baseConfig: default
        limitForPeriod: 1

使用另一个远程服务接口的实现,不抛出异常,当做正常服务进行,为了让结果明显一些,让方法sleep 5秒:

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() throws InterruptedException  {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        Thread.sleep(5000);
        log.info("服务正常运行,获取用户列表");
        // 模拟数据库正常查询
        return repository.findAll();
    }
}

用线程池调5个线程去请求服务:

public class RateLimiterServiceImplTest{

    @Autowired
    private RateLimiterServiceImpl rateLimiterService;

    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;

    @Test
    public void rateLimiterTest() {
        RateLimiterUtil.addRateLimiterListener(rateLimiterRegistry.rateLimiter("backendA"));
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0; i<5; i++){
            pool.submit(() -> {
                // rateLimiterService.rateLimiterAOP();
                rateLimiterService.rateLimiterNotAOP();
            });
        }
        pool.shutdown();

        while (!pool.isTerminated());
        }
    }
}

看一下测试结果:

开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:15.735+08:00[Asia/Shanghai]}
count的值 = 0
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.737+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.739+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.740+08:00[Asia/Shanghai]}
服务失败: RateLimiter 'backendA' does not permit further calls
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.745+08:00[Asia/Shanghai]}
服务正常运行,获取用户列表
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]

只有一个服务调用成功,其他都执行失败了。现在我们把刷新时间调成1s:

configs:
      default:
        limitForPeriod: 5
        limitRefreshPeriod: 1s
        timeoutDuration: 5s
    instances:
      backendA:
        baseConfig: default
        limitForPeriod: 1

重新执行,结果如下:

开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
 count的值 = 0
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
count的值 = 1
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 2
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 3
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:20.703+08:00[Asia/Shanghai]}
count的值 = 4
服务正常运行,获取用户列表
服务正常运行,获取用户列表
服务正常运行,获取用户列表
服务正常运行,获取用户列表
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
 执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
服务正常运行,获取用户列表
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]

可以看出,几个服务都被放入并正常执行了,即使上个服务还没完成,依然可以放入,只与时间有关,而与线程无关。

作者:I讨厌鬼I
链接:https://www.jianshu.com/p/5531b66b777a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

作者:Jeebiz  创建时间:2020-11-05 00:43
 更新时间:2023-12-22 21:08