https://blog.csdn.net/WoodYangOY/article/details/124627533

在分区和分组的基础上,有些时候我们的消息需要自己手动进行ACK操作,这个时候就需要配置Spring Cloud Stream 的ack

消息生产者

消息确认

PublisherConfirmsProducer

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Slf4j
@Component
public class PublisherConfirmsProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {

        /*
         * 回调函数
         * @param correlationData 消息内容
         * @param ack 是否成功
         * @param cause 失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("已经收到消息,Id为 : {}", correlationData.getId());
            } else {
                log.info("未接收到消息:{},原因:{}", correlationData.getId(), cause);
            }
        });

        /*
         * 消息退回,指消息未达到目的地被回退
         */
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
            log.info("【消息被退回】交换机:{},消息:{},退回原因:{}", exchange, new String(message.getBody()), replyText);
        });

    }

}

配置

spring:
  # Rabbitmq 配置:RabbitProperties
  rabbitmq:
    # RabbitMQ主机地址。如果设置了address属性,则忽略。
    host: 192.168.31.100
    # RabbitMQ端口。如果设置了address属性,则忽略。默认为 5672,如果启用了SSL,则默认为5671。
    port: 5672
    # 访问broker进行身份验证的账号。默认为“guest”。
    username: admin
    # 访问broker进行身份验证的密码。默认为“guest”。
    password: admin
    # 连接到broker时要使用的虚拟主机。
    virtual-host: /
    # 消息发布确认方式,有三个可选值:SIMPLE、CORRELATED、NONE。默认为NONE。
    publisher-confirm-type: simple
    # 启动消息失败返回,只有设置为true时,消息路由失败才会触发Return回调
    publisher-returns: true
    # RabbitProperties.Template
    template:
      # 当exchange无法找到任何一个合适的queue时,将消息return给生产者
      mandatory: true
    # RabbitProperties.Listener
    listener:
      simple:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual
      direct:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual
  cloud:
    stream:
      # 消息中间件绑定配置:BindingProperties
      bindings:
        sms-output:
          # 指定要使用的 Exchange 名称
          destination: ramq.sms.topic
          #设置消息类型
          content-type: application/json
          #设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息.)
          group: group1
          # 生产者配置:ProducerProperties
          producer:
            #分区关键字 对象中的id,对象
            partition-key-expression: payload
            #分区大小
            partition-count: 2
      # 消息中间件配置:BinderProperties
      binders:
        #配置绑定器
        defaultRabbit:
          type: rabbit

消息消费者

spring:
  # Rabbitmq 配置:RabbitProperties
  rabbitmq:
    # RabbitMQ主机地址。如果设置了address属性,则忽略。
    host: 192.168.31.100
    # RabbitMQ端口。如果设置了address属性,则忽略。默认为 5672,如果启用了SSL,则默认为5671。
    port: 5672
    # 访问broker进行身份验证的账号。默认为“guest”。
    username: admin
    # 访问broker进行身份验证的密码。默认为“guest”。
    password: admin
    # 连接到broker时要使用的虚拟主机。
    virtual-host: /
    # 消息发布确认方式,有三个可选值:SIMPLE、CORRELATED、NONE。默认为NONE。
    publisher-confirm-type: simple
    # 启动消息失败返回,只有设置为true时,消息路由失败才会触发Return回调
    publisher-returns: true
    # RabbitProperties.Template
    template:
      # 当exchange无法找到任何一个合适的queue时,将消息return给生产者
      mandatory: true
    # RabbitProperties.Listener
    listener:
      simple:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual
      direct:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: manual
  cloud:
    stream:
      #instanceCount: 2  #消费者总数
      #instanceIndex: 0  #当前消费者的索引
      # 消息中间件绑定配置:BindingProperties
      bindings:
        sms-input:
          # 指定要使用的 Exchange 名称
          destination: ramq.sms.topic
          #设置消息类型
          content-type: application/json
          #设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息.)
          group: group1
          # 消费者配置:ConsumerProperties
          consumer:
            #开启分区支持
            partitioned: true
            #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
            acknowledge-mode: manual
      # 消息中间件配置:BinderProperties
      binders:
        defaultRabbit:
          type: rabbit
      # Rabbit 消息中间件专属配置:RabbitExtendedBindingProperties
      rabbit:
        # Rabbit消息中间件绑定配置:RabbitBindingProperties
        bindings:
          sms-input:
            # 消费者配置:RabbitCommonProperties,RabbitConsumerProperties
            consumer:
              #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
              acknowledge-mode: manual

basicReject / basicNack / basicRecover区别

  • channel.basicReject(deliveryTag, true);

    basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
    该方法reject后,该消费者还是会消费到该条被reject的消息。

  • channel.basicNack(deliveryTag, false, true);

    basic.nack 方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
    ,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

  • channel.basicRecover(true);

    basic.recover 是否恢复消息到队列,参数是是否requeue,true则重新入队列,
    并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。

作者:Jeebiz  创建时间:2023-04-03 15:07
最后编辑:Jeebiz  更新时间:2024-11-01 10:06