RateLimiter
简介
高频控制是可以限制服务调用频率,Resilience4j的RateLimiter可以对频率进行纳秒级别的控制,在每一个周期刷新可以调用的次数,还可以设定线程等待权限的时间。
可配置参数
配置参数 | 默认值 | 描述 |
---|---|---|
timeoutDuration | 5[s] | 线程等待权限的默认等待时间 |
limitRefreshPeriod | 500[ns] | 权限刷新的时间,每个周期结束后,RateLimiter将会把权限计数设置为limitForPeriod的值 |
limiteForPeriod | 50 | 一个限制刷新期间的可用权限数 |
测试前准备
首先引入maven依赖:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.6.1</version>
</dependency>
resilience4j-spring-boot2 集成了circuitbeaker、retry、bulkhead、ratelimiter、timelimiter 几个模块。
application.yml 配置
resilience4j:
ratelimiter:
configs:
default:
timeout-duration: 3s
limit-for-period: 50
limit-refresh-period: 500
instances:
backendA:
base-config: default
timeout-duration: 6s
backendB:
base-config: default
timeout-duration: 6s
用于监控RateLimiter状态及事件的工具类
同样为了监控RateLimiter组件,写一个工具类:
import io.github.resilience4j.ratelimiter.RateLimiter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RateLimiterUtil {
/**
* @Description: 获取rateLimiter的状态
*/
public static void getRateLimiterStatus(String time, RateLimiter rateLimiter){
RateLimiter.Metrics metrics = rateLimiter.getMetrics();
// Returns the number of availablePermissions in this duration.
int availablePermissions = metrics.getAvailablePermissions();
// Returns the number of WaitingThreads
int numberOfWaitingThreads = metrics.getNumberOfWaitingThreads();
log.info(time + ", metrics[ availablePermissions=" + availablePermissions +
", numberOfWaitingThreads=" + numberOfWaitingThreads + " ]");
}
/**
* @Description: 监听rateLimiter事件
*/
public static void addRateLimiterListener(RateLimiter rateLimiter){
rateLimiter.getEventPublisher()
.onSuccess(event -> log.info(event.toString()))
.onFailure(event -> log.info(event.toString()));
}
}
调用方法
还是以之前查询用户列表的服务为例。RateLimiter支持AOP和程序式两种方式的调用。
程序式的调用方法
调用方法都类似,装饰方法之后用Try.of().recover()来执行:
public class RateLimiterServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
public List<User> ratelimiterNotAOP(){
// 通过注册器获得RateLimiter实例
RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("backendA");
RateLimiterUtil.getRateLimiterStatus("开始执行前: ", rateLimiter);
// 通过Try.of().recover()调用装饰后的服务
Try<List<User>> result = Try.of(
Bulkhead.decorateCheckedSupplier(rateLimiter, remoteServiceConnector::process))
.recover(BulkheadFullException.class, throwable -> {
log.info("服务失败: " + throwable.getLocalizedMessage());
return new ArrayList();
});
RateLimiterUtil.getRateLimiterStatus("执行结束: ", rateLimiter);
return result.get();
}
}
AOP式的调用方法
首先在连接器方法上使用@RateLimiter(name=””, fallbackMethod=””)注解,其中name是要使用的RateLimiter实例的名称,fallbackMethod是要使用的降级方法:
public RemoteServiceConnector{
@RateLimiter(name = "backendA", fallbackMethod = "fallback")
public List<User> process() throws TimeoutException, InterruptedException {
List<User> users;
users = remoteServic.process();
return users;
}
private List<User> fallback(BulkheadFullException e){
log.info("服务失败: " + e.getLocalizedMessage());
return new ArrayList();
}
}
如果 Retry、CircuitBreaker、Bulkhead、RateLimiter同时注解在方法上,默认的顺序是Retry>CircuitBreaker>RateLimiter>Bulkhead
,即先控制并发再限流然后熔断最后重试
接下来直接调用方法:
public class RateLimiterServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
public List<User> rateLimiterAOP() throws TimeoutException, InterruptedException {
List<User> result = remoteServiceConnector.process();
BulkheadUtil.getBulkheadStatus("执行结束:", rateLimiterRegistry.retry("backendA"));
return result;
}
}
使用测试
在application.yml文件中将backenA设定为20s只能处理1个请求,为便于观察,刷新时间设定为20s,等待时间设定为5s:
configs:
default:
limitForPeriod: 5
limitRefreshPeriod: 20s
timeoutDuration: 5s
instances:
backendA:
baseConfig: default
limitForPeriod: 1
使用另一个远程服务接口的实现,不抛出异常,当做正常服务进行,为了让结果明显一些,让方法sleep 5秒:
public class RemoteServiceImpl implements RemoteService {
private static AtomicInteger count = new AtomicInteger(0);
public List<User> process() throws InterruptedException {
int num = count.getAndIncrement();
log.info("count的值 = " + num);
Thread.sleep(5000);
log.info("服务正常运行,获取用户列表");
// 模拟数据库正常查询
return repository.findAll();
}
}
用线程池调5个线程去请求服务:
public class RateLimiterServiceImplTest{
@Autowired
private RateLimiterServiceImpl rateLimiterService;
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
@Test
public void rateLimiterTest() {
RateLimiterUtil.addRateLimiterListener(rateLimiterRegistry.rateLimiter("backendA"));
ExecutorService pool = Executors.newCachedThreadPool();
for (int i=0; i<5; i++){
pool.submit(() -> {
// rateLimiterService.rateLimiterAOP();
rateLimiterService.rateLimiterNotAOP();
});
}
pool.shutdown();
while (!pool.isTerminated());
}
}
}
看一下测试结果:
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:15.735+08:00[Asia/Shanghai]}
count的值 = 0
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.737+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.739+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.740+08:00[Asia/Shanghai]}
服务失败: RateLimiter 'backendA' does not permit further calls
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.745+08:00[Asia/Shanghai]}
服务正常运行,获取用户列表
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
服务失败: RateLimiter 'backendA' does not permit further calls
执行结束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
只有一个服务调用成功,其他都执行失败了。现在我们把刷新时间调成1s:
configs:
default:
limitForPeriod: 5
limitRefreshPeriod: 1s
timeoutDuration: 5s
instances:
backendA:
baseConfig: default
limitForPeriod: 1
重新执行,结果如下:
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
开始执行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
count的值 = 0
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
count的值 = 1
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 2
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 3
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:20.703+08:00[Asia/Shanghai]}
count的值 = 4
服务正常运行,获取用户列表
服务正常运行,获取用户列表
服务正常运行,获取用户列表
服务正常运行,获取用户列表
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
服务正常运行,获取用户列表
执行结束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
可以看出,几个服务都被放入并正常执行了,即使上个服务还没完成,依然可以放入,只与时间有关,而与线程无关。
作者:I讨厌鬼I
链接:https://www.jianshu.com/p/5531b66b777a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
更新时间:2024-10-26 16:30