Bulkhead
简介
Resilence4j的Bulkhead提供两种实现,一种是基于信号量的,另一种是基于有等待队列的固定大小的线程池的,由于基于信号量的Bulkhead能很好地在多线程和I/O模型下工作,所以选择介绍基于信号量的Bulkhead的使用。
可配置参数
| 配置参数 | 默认值 | 描述 | 
|---|---|---|
| maxConcurrentCalls | 25 | 可允许的最大并发线程数 | 
| maxWaitDuration | 0 | 尝试进入饱和舱壁时应阻止线程的最大时间 | 
测试前准备
首先引入maven依赖:
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.6.1</version>
</dependency>resilience4j-spring-boot2 集成了circuitbeaker、retry、bulkhead、ratelimiter、timelimiter 几个模块。
application.yml 配置
resilience4j:
  bulkhead:
    configs:
      default:
        max-concurrent-calls: 10
        max-wait-duration: 1s
    instances:
      backendA:
        base-config: default
        max-concurrent-calls: 3
      backendB:
        base-config: default
        max-wait-duration: 100和CircuitBreaker差不多,都是可以通过继承覆盖配置设定实例的。
用于监控Bulkhead状态及事件的工具类
同样为了监控Bulkhead组件,写一个工具类:
import io.github.resilience4j.bulkhead.Bulkhead;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BulkhdadUtil {
    /**
     * @Description: 获取bulkhead的状态
     */
    public static void getBulkheadStatus(String time, Bulkhead bulkhead){
        Bulkhead.Metrics metrics = bulkhead.getMetrics();
        // Returns the number of parallel executions this bulkhead can support at this point in time.
        int availableConcurrentCalls =  metrics.getAvailableConcurrentCalls();
        // Returns the configured max amount of concurrent calls
        int maxAllowedConcurrentCalls = metrics.getMaxAllowedConcurrentCalls();
        log.info(time  + ", metrics[ availableConcurrentCalls=" + availableConcurrentCalls +
                ", maxAllowedConcurrentCalls=" + maxAllowedConcurrentCalls + " ]");
    }
    /**
     * @Description: 监听bulkhead事件
     */
    public static void addBulkheadListener(Bulkhead bulkhead){
        bulkhead.getEventPublisher()
                .onCallFinished(event -> log.info(event.toString()))
                .onCallPermitted(event -> log.info(event.toString()))
                .onCallRejected(event -> log.info(event.toString()));
    }
}调用方法
还是以之前查询用户列表的服务为例。Bulkhead支持AOP和程序式两种方式的调用。
程序式的调用方法
调用方法都类似,装饰方法之后用Try.of().recover()来执行:
public class BulkheadServiceImpl {
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    @Autowired
    private BulkheadRegistry bulkheadRegistry;
    public List<User> bulkheadNotAOP(){
        // 通过注册器获得Bulkhead实例
        Bulkhead bulkhead = bulkheadRegistry.bulkhead("backendA");
        BulkhdadUtil.getBulkheadStatus("开始执行前: ", bulkhead);
        // 通过Try.of().recover()调用装饰后的服务
        Try<List<User>> result = Try.of(
            Bulkhead.decorateCheckedSupplier(bulkhead, remoteServiceConnector::process))
            .recover(BulkheadFullException.class, throwable -> {
                log.info("服务失败: " + throwable.getLocalizedMessage());
                return new ArrayList();
            });
        BulkhdadUtil.getBulkheadStatus("执行结束: ", bulkhead);
        return result.get();
    }
}
AOP式的调用方法
首先在连接器方法上使用@Bulkhead(name=””, fallbackMethod=””, type=””)注解,其中name是要使用的Bulkhead实例的名称,fallbackMethod是要使用的降级方法,type是选择信号量或线程池的Bulkhead:
public RemoteServiceConnector{
    @Bulkhead(name = "backendA", fallbackMethod = "fallback", type = Bulkhead.Type.SEMAPHORE)
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
    private List<User> fallback(BulkheadFullException e){
        log.info("服务失败: " + e.getLocalizedMessage());
        return new ArrayList();
    }
} 
如果Retry、CircuitBreaker、Bulkhead同时注解在方法上,默认的顺序是Retry>CircuitBreaker>Bulkhead,即先控制并发再熔断最后重试,之后直接调用方法:
public class BulkheadServiceImpl {
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    @Autowired
    private BulkheadRegistry bulkheadRegistry;
    public List<User> bulkheadAOP() throws TimeoutException, InterruptedException {
        List<User> result = remoteServiceConnector.process();
        BulkheadUtil.getBulkheadStatus("执行结束:", bulkheadRegistry.retry("backendA"));
        return result;
    }
}
使用测试
在application.yml文件中将backenA线程数限制为1,便于观察,最大等待时间为1s,超过1s的会走降级方法:
instances:
    backendA:
        baseConfig: default
        maxConcurrentCalls: 1使用另一个远程服务接口的实现,不抛出异常,当做正常服务进行:
public class RemoteServiceImpl implements RemoteService {
    private static AtomicInteger count = new AtomicInteger(0);
    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        log.info("服务正常运行,获取用户列表");
        // 模拟数据库正常查询
        return repository.findAll();
    }
}
用线程池调5个线程去请求服务:
public class BulkheadServiceImplTest{
    @Autowired
    private BulkheadServiceImpl bulkheadService;
    @Autowired
    private BulkheadRegistry bulkheadRegistry;
    @Test
    public void bulkheadTest() {
        BulkhdadUtil.addBulkheadListener(bulkheadRegistry.bulkhead("backendA"));
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0; i<5; i++){
            pool.submit(() -> {
                // bulkheadService.bulkheadAOP();
                bulkheadService.bulkheadNotAOP();
            });
        }
        pool.shutdown();
        while (!pool.isTerminated());
        }
    }
}
看一下运行结果:
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服务正常运行,获取用户列表
开始执行前: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服务失败: Bulkhead 'backendA' is full and does not permit further calls
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
执行结束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
由上可以看出,5个请求只有一个进入,其余触发rejected事件,然后自动进入降级方法。接下来我们把等待时间稍微加长一些:
instances:
    backendA:
        baseConfig: default
        maxConcurrentCalls: 1
        maxWaitDuration: 5000
再运行一次:
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
开始执行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服务正常运行,获取用户列表
Bulkhead 'backendA' permitted a call.
count的值 = 1
Bulkhead 'backendA' has finished a call.
服务正常运行,获取用户列表
执行结束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
执行结束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
前面的线程没有马上被拒绝,而是等待了一段时间再执行。
作者:I讨厌鬼I
链接:https://www.jianshu.com/p/5531b66b777a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
更新时间:2025-07-19 11:45
