本文介绍: 幂等性是指无论操作执行多少次,都是得到相同结果,而不会产生其他副作用。在rabbitMQ中同一条消息在MQ中被消费多次

在Java中,可以使用消息队列实现消息异步处理,其中常用消息队列有 RabbitMQ、ActiveMQ、Kafka 等。

什么是幂等性?

幂等性是指无论操作执行多少次,都是得到相同结果,而不会产生其他副作用

rabbitMQ中

什么消息重复消费

一条消息在MQ中被消费多次

出现重复消费原因

生产者发送一条消息rabbitMQ,但rabbitMQ尚未收到消费者确认,会认为消息消费未被消费而重新发送

网络稳定消费者故障网络分区、消息重复传递策略消费者超时设置不当

为什么需要避免重复消费

业务错误:我本来写的业务逻辑就是只要执行一次

数据重复数据插入重复,破坏数据唯一

资源浪费占用系统资源,降低系统性能

如何避免消息重复消费
消息去重

通过记录已经消费过的消息,在消息到达时检查是否已经在记录存在,从而避免重复处理

  if (!processedMessages.contains(message)) {
                    processMessage(message);
                    processedMessages.add(message);
                }

消息幂等性
分布式锁(消息幂等性)

使用UUID生成唯一Id ,作为messageId

使用了唯一的消息ID来确保同一条消息只会被处理一次

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .messageId(UUID.randomUUID().toString()) // 唯一标识
                    .build();

        if (!isMessageProcessed(messageId)) {
            processMessage(message);
            saveProcessedMessage(messageId);
        }

消费者查询该消息是否已经被处理过,如果没有处理过,则调用processMessage()方法处理该消息,并使用 saveProcessedMessage()方法保存已经处理过的消息。

            //手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false

处理完消息后,还需要调用channel.basicAck(envelope.getDeliveryTag(), false)方法确认消息已经被消费。这是因为RabbitMQ是一个消息的投递机制,只有在消费者确认了消息已经被处理后,才会从消息队列删除该消息。

使用redis实现避免重复消费

生产者

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(1) // 指定消息是否需要持久化 1-需要 2-不需要
                    .messageId(UUID.randomUUID().toString()) // 唯一标识
                    .build();

消费者

 String result = jedis.set(messageId, "0", "NX", "EX", 10);
  
 if (result != null && result.equalsIgnoreCase("OK")){
                    System.out.println("接收到消息:"+ new String(body,"UTF-8"));

                    //消费成功 set messageId - 1
                    jedis.set(messageId,"1");
                    channel.basicAck(envelope.getDeliveryTag(),false);

                }else {
                    //如果1中的setnx失败获取key对应value,如果是1,设置ack 如果是0 return

                    String s = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)){
                        //消费完了
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }

springboot

如果存在设置value为1;如果value是1,ack

事务性消费
消费状态追踪

原文地址:https://blog.csdn.net/LUOZONGW/article/details/134779553

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_41534.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注