spring:
  # Rabbitmq 配置:RabbitProperties
  rabbitmq:
    # RabbitMQ主机地址。如果设置了address属性,则忽略。
    host: 192.168.3.44
    # RabbitMQ端口。如果设置了address属性,则忽略。默认为 5672,如果启用了SSL,则默认为5671。
    port: 5672
    # 访问broker进行身份验证的账号。默认为“guest”。
    username: admin
    # 访问broker进行身份验证的密码。默认为“guest”。
    password: admin
    # 连接到broker时要使用的虚拟主机。
    virtual-host: /
    # 消息发布确认方式,有三个可选值:SIMPLE、CORRELATED、NONE。默认为NONE。
    publisher-confirm-type: simple
    # 启动消息失败返回,只有设置为true时,消息路由失败才会触发Return回调
    publisher-returns: true
    # RabbitProperties.Template
    template:
      # 当exchange无法找到任何一个合适的queue时,将消息return给生产者
      mandatory: true
    # RabbitProperties.Listener
    listener:
      simple:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual
        # 限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
        prefetch: 100
        # 开启批量消费
        consumer-batch-enabled: true
        # 每次批量消费大小
        batch-size: 16
      direct:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual

批量消费的类

@Component
public class BatchQueueListener implements ChannelAwareMessageListener {

    // 批量接收处理
    @RabbitListener(queues = "ramq.batch-test" , containerFactory = "batchQueueRabbitListenerContainerFactory")
    @Override
    public void onMessageBatch(List<Message> messages, Channel channel) {
        try {
            log.info("batch.queue.consumer 收到{}条message", messages.size());
            if (messages.size() > 0) {
                log.info("第一条数据是: {}", new String(messages.get(0).getBody()));
            }

            channel.basicAck(messages.size(), true);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // TODO Auto-generated method stub

    }

}
作者:Jeebiz  创建时间:2023-05-12 17:55
最后编辑:Jeebiz  更新时间:2024-11-01 10:06