Bulkhead
简介
Resilence4j的Bulkhead提供两种实现,一种是基于信号量的,另一种是基于有等待队列的固定大小的线程池的,由于基于信号量的Bulkhead能很好地在多线程和I/O模型下工作,所以选择介绍基于信号量的Bulkhead的使用。
可配置参数
配置参数 | 默认值 | 描述 |
---|---|---|
maxConcurrentCalls | 25 | 可允许的最大并发线程数 |
maxWaitDuration | 0 | 尝试进入饱和舱壁时应阻止线程的最大时间 |
测试前准备
首先引入maven依赖:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.6.1</version>
</dependency>
resilience4j-spring-boot2 集成了circuitbeaker、retry、bulkhead、ratelimiter、timelimiter 几个模块。
application.yml 配置
resilience4j:
bulkhead:
configs:
default:
max-concurrent-calls: 10
max-wait-duration: 1s
instances:
backendA:
base-config: default
max-concurrent-calls: 3
backendB:
base-config: default
max-wait-duration: 100
和CircuitBreaker差不多,都是可以通过继承覆盖配置设定实例的。
用于监控Bulkhead状态及事件的工具类
同样为了监控Bulkhead组件,写一个工具类:
import io.github.resilience4j.bulkhead.Bulkhead;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BulkhdadUtil {
/**
* @Description: 获取bulkhead的状态
*/
public static void getBulkheadStatus(String time, Bulkhead bulkhead){
Bulkhead.Metrics metrics = bulkhead.getMetrics();
// Returns the number of parallel executions this bulkhead can support at this point in time.
int availableConcurrentCalls = metrics.getAvailableConcurrentCalls();
// Returns the configured max amount of concurrent calls
int maxAllowedConcurrentCalls = metrics.getMaxAllowedConcurrentCalls();
log.info(time + ", metrics[ availableConcurrentCalls=" + availableConcurrentCalls +
", maxAllowedConcurrentCalls=" + maxAllowedConcurrentCalls + " ]");
}
/**
* @Description: 监听bulkhead事件
*/
public static void addBulkheadListener(Bulkhead bulkhead){
bulkhead.getEventPublisher()
.onCallFinished(event -> log.info(event.toString()))
.onCallPermitted(event -> log.info(event.toString()))
.onCallRejected(event -> log.info(event.toString()));
}
}
调用方法
还是以之前查询用户列表的服务为例。Bulkhead支持AOP和程序式两种方式的调用。
程序式的调用方法
调用方法都类似,装饰方法之后用Try.of().recover()来执行:
public class BulkheadServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private BulkheadRegistry bulkheadRegistry;
public List<User> bulkheadNotAOP(){
// 通过注册器获得Bulkhead实例
Bulkhead bulkhead = bulkheadRegistry.bulkhead("backendA");
BulkhdadUtil.getBulkheadStatus("开始执行前: ", bulkhead);
// 通过Try.of().recover()调用装饰后的服务
Try<List<User>> result = Try.of(
Bulkhead.decorateCheckedSupplier(bulkhead, remoteServiceConnector::process))
.recover(BulkheadFullException.class, throwable -> {
log.info("服务失败: " + throwable.getLocalizedMessage());
return new ArrayList();
});
BulkhdadUtil.getBulkheadStatus("执行结束: ", bulkhead);
return result.get();
}
}
AOP式的调用方法
首先在连接器方法上使用@Bulkhead(name=””, fallbackMethod=””, type=””)注解,其中name是要使用的Bulkhead实例的名称,fallbackMethod是要使用的降级方法,type是选择信号量或线程池的Bulkhead:
public RemoteServiceConnector{
@Bulkhead(name = "backendA", fallbackMethod = "fallback", type = Bulkhead.Type.SEMAPHORE)
public List<User> process() throws TimeoutException, InterruptedException {
List<User> users;
users = remoteServic.process();
return users;
}
private List<User> fallback(BulkheadFullException e){
log.info("服务失败: " + e.getLocalizedMessage());
return new ArrayList();
}
}
如果Retry、CircuitBreaker、Bulkhead同时注解在方法上,默认的顺序是Retry>CircuitBreaker>Bulkhead,即先控制并发再熔断最后重试,之后直接调用方法:
public class BulkheadServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private BulkheadRegistry bulkheadRegistry;
public List<User> bulkheadAOP() throws TimeoutException, InterruptedException {
List<User> result = remoteServiceConnector.process();
BulkheadUtil.getBulkheadStatus("执行结束:", bulkheadRegistry.retry("backendA"));
return result;
}
}
使用测试
在application.yml文件中将backenA线程数限制为1,便于观察,最大等待时间为1s,超过1s的会走降级方法:
instances:
backendA:
baseConfig: default
maxConcurrentCalls: 1
使用另一个远程服务接口的实现,不抛出异常,当做正常服务进行:
public class RemoteServiceImpl implements RemoteService {
private static AtomicInteger count = new AtomicInteger(0);
public List<User> process() {
int num = count.getAndIncrement();
log.info("count的值 = " + num);
log.info("服务正常运行,获取用户列表");
// 模拟数据库正常查询
return repository.findAll();
}
}
用线程池调5个线程去请求服务:
public class BulkheadServiceImplTest{
@Autowired
private BulkheadServiceImpl bulkheadService;
@Autowired
private BulkheadRegistry bulkheadRegistry;
@Test
public void bulkheadTest() {
BulkhdadUtil.addBulkheadListener(bulkheadRegistry.bulkhead("backendA"));
ExecutorService pool = Executors.newCachedThreadPool();
for (int i=0; i<5; i++){
pool.submit(() -> {
// bulkheadService.bulkheadAOP();
bulkheadService.bulkheadNotAOP();
});
}
pool.shutdown();
while (!pool.isTerminated());
}
}
}
看一下运行结果:
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服务正常运行,获取用户列表
开始执行前: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
执行结束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
由上可以看出,5个请求只有一个进入,其余触发rejected事件,然后自动进入降级方法。接下来我们把等待时间稍微加长一些:
instances:
backendA:
baseConfig: default
maxConcurrentCalls: 1
maxWaitDuration: 5000
再运行一次:
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服务正常运行,获取用户列表
Bulkhead 'backendA' permitted a call.
count的值 = 1
Bulkhead 'backendA' has finished a call.
服务正常运行,获取用户列表
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
执行结束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
前面的线程没有马上被拒绝,而是等待了一段时间再执行。
作者:I讨厌鬼I
链接:https://www.jianshu.com/p/5531b66b777a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
更新时间:2024-10-26 16:30