消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决方案:

1. 全局ID

MQ消费者的幂等性的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单

消费者消费mq中的消息:也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。

给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

(1)生产者代码

@Component
public class FanoutProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String queueName) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "xx@163.com");
        jsonObject.put("timestamp", System.currentTimeMillis());
        String jsonString = jsonObject.toJSONString();
        System.out.println("jsonString:" + jsonString);
        // 设置消息唯一id 保证每次重试消息id唯一
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").build();
                //消息id设置在请求头里面 用UUID做全局ID
        amqpTemplate.convertAndSend(queueName, message);
    }
}

(2)消费者代码

@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {
     // 获取消息Id
     String messageId = message.getMessageProperties().getMessageId();  
     //id获取之
     if (messageId == null) {
            return;
     }
     String redisMsgId = ...//根据messageId去redis中查找消息
     if(未找到reidsMsgId) {
        String msg = new String(message.getBody(), "UTF-8");
              //消息内容获取之
         System.out.println("-----邮件消费者获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" + msg);

         JSONObject jsonObject = JSONObject.parseObject(msg);
         // 获取email参数
         String email = jsonObject.getString("email");
         // 请求地址
         String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
         JSONObject result = HttpClientUtils.httpGet(emailUrl);
         if (result == null) {
                 // 因为网络原因,造成无法访问,继续重试
                 throw new Exception("调用接口失败!");
         }
         System.out.println("执行结束....");
         //写入到redis中
     }

}

2. 利用Redis的原子性去实现

我们都知道redis是单线程的,并且性能也非常好,提供了很多原子性的命令。比如可以使用 setnx 命令。

在接收到消息后将消息ID作为key执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费了,执行失败表示消息已经被消费了。

Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。

原子性问题注意:

  • 第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?

  • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略(同步到关系型数据库)?缓存又如何做到数据可靠性保障呢

关于不落库,定时同步的策略,目前主流方案有两种,第一种为双缓存模式,异步写入到缓存中,也可以异步写到数据库,但是最终会有一个回调函数检查,这样能保障最终一致性,不能保证100%的实时性。第二种是定时同步,比如databus同步。

————————————————
版权声明:本文为CSDN博主「有点热想开空调」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/m0_61635932/article/details/120566833

作者:Jeebiz  创建时间:2023-04-04 16:40
最后编辑:Jeebiz  更新时间:2024-11-01 10:06