消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
解决方案:
1. 全局ID
MQ消费者的幂等性的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单
消费者消费mq中的消息:也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
(1)生产者代码
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "xx@163.com");
jsonObject.put("timestamp", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 设置消息唯一id 保证每次重试消息id唯一
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "").build();
//消息id设置在请求头里面 用UUID做全局ID
amqpTemplate.convertAndSend(queueName, message);
}
}
(2)消费者代码
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
//id获取之
if (messageId == null) {
return;
}
String redisMsgId = ...//根据messageId去redis中查找消息
if(未找到reidsMsgId) {
String msg = new String(message.getBody(), "UTF-8");
//消息内容获取之
System.out.println("-----邮件消费者获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
// 获取email参数
String email = jsonObject.getString("email");
// 请求地址
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
JSONObject result = HttpClientUtils.httpGet(emailUrl);
if (result == null) {
// 因为网络原因,造成无法访问,继续重试
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");
//写入到redis中
}
}
2. 利用Redis的原子性去实现
我们都知道redis是单线程的,并且性能也非常好,提供了很多原子性的命令。比如可以使用 setnx 命令。
在接收到消息后将消息ID作为key执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费了,执行失败表示消息已经被消费了。
Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。
原子性问题注意:
第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略(同步到关系型数据库)?缓存又如何做到数据可靠性保障呢
关于不落库,定时同步的策略,目前主流方案有两种,第一种为双缓存模式,异步写入到缓存中,也可以异步写到数据库,但是最终会有一个回调函数检查,这样能保障最终一致性,不能保证100%的实时性。第二种是定时同步,比如databus同步。
————————————————
版权声明:本文为CSDN博主「有点热想开空调」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/m0_61635932/article/details/120566833
最后编辑:Jeebiz 更新时间:2024-11-01 10:06