https://blog.csdn.net/WoodYangOY/article/details/124627533
在分区和分组的基础上,有些时候我们的消息需要自己手动进行ACK操作,这个时候就需要配置Spring Cloud Stream 的ack
消息生产者
消息确认
PublisherConfirmsProducer
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Component
public class PublisherConfirmsProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
/*
* 回调函数
* @param correlationData 消息内容
* @param ack 是否成功
* @param cause 失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("已经收到消息,Id为 : {}", correlationData.getId());
} else {
log.info("未接收到消息:{},原因:{}", correlationData.getId(), cause);
}
});
/*
* 消息退回,指消息未达到目的地被回退
*/
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
log.info("【消息被退回】交换机:{},消息:{},退回原因:{}", exchange, new String(message.getBody()), replyText);
});
}
}
配置
spring:
# Rabbitmq 配置:RabbitProperties
rabbitmq:
# RabbitMQ主机地址。如果设置了address属性,则忽略。
host: 192.168.31.100
# RabbitMQ端口。如果设置了address属性,则忽略。默认为 5672,如果启用了SSL,则默认为5671。
port: 5672
# 访问broker进行身份验证的账号。默认为“guest”。
username: admin
# 访问broker进行身份验证的密码。默认为“guest”。
password: admin
# 连接到broker时要使用的虚拟主机。
virtual-host: /
# 消息发布确认方式,有三个可选值:SIMPLE、CORRELATED、NONE。默认为NONE。
publisher-confirm-type: simple
# 启动消息失败返回,只有设置为true时,消息路由失败才会触发Return回调
publisher-returns: true
# RabbitProperties.Template
template:
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
mandatory: true
# RabbitProperties.Listener
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
cloud:
stream:
# 消息中间件绑定配置:BindingProperties
bindings:
sms-output:
# 指定要使用的 Exchange 名称
destination: ramq.sms.topic
#设置消息类型
content-type: application/json
#设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息.)
group: group1
# 生产者配置:ProducerProperties
producer:
#分区关键字 对象中的id,对象
partition-key-expression: payload
#分区大小
partition-count: 2
# 消息中间件配置:BinderProperties
binders:
#配置绑定器
defaultRabbit:
type: rabbit
消息消费者
spring:
# Rabbitmq 配置:RabbitProperties
rabbitmq:
# RabbitMQ主机地址。如果设置了address属性,则忽略。
host: 192.168.31.100
# RabbitMQ端口。如果设置了address属性,则忽略。默认为 5672,如果启用了SSL,则默认为5671。
port: 5672
# 访问broker进行身份验证的账号。默认为“guest”。
username: admin
# 访问broker进行身份验证的密码。默认为“guest”。
password: admin
# 连接到broker时要使用的虚拟主机。
virtual-host: /
# 消息发布确认方式,有三个可选值:SIMPLE、CORRELATED、NONE。默认为NONE。
publisher-confirm-type: simple
# 启动消息失败返回,只有设置为true时,消息路由失败才会触发Return回调
publisher-returns: true
# RabbitProperties.Template
template:
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
mandatory: true
# RabbitProperties.Listener
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
cloud:
stream:
#instanceCount: 2 #消费者总数
#instanceIndex: 0 #当前消费者的索引
# 消息中间件绑定配置:BindingProperties
bindings:
sms-input:
# 指定要使用的 Exchange 名称
destination: ramq.sms.topic
#设置消息类型
content-type: application/json
#设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息.)
group: group1
# 消费者配置:ConsumerProperties
consumer:
#开启分区支持
partitioned: true
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
# 消息中间件配置:BinderProperties
binders:
defaultRabbit:
type: rabbit
# Rabbit 消息中间件专属配置:RabbitExtendedBindingProperties
rabbit:
# Rabbit消息中间件绑定配置:RabbitBindingProperties
bindings:
sms-input:
# 消费者配置:RabbitCommonProperties,RabbitConsumerProperties
consumer:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: manual
basicReject / basicNack / basicRecover区别
channel.basicReject(deliveryTag, true);
basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
该方法reject后,该消费者还是会消费到该条被reject的消息。channel.basicNack(deliveryTag, false, true);
basic.nack 方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。channel.basicRecover(true);
basic.recover 是否恢复消息到队列,参数是是否requeue,true则重新入队列,
并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
作者:Jeebiz 创建时间:2023-04-03 15:07
最后编辑:Jeebiz 更新时间:2024-11-01 10:06
最后编辑:Jeebiz 更新时间:2024-11-01 10:06