本文介绍: RabbitMq保证消息不丢失

1.生产者可靠性消息投递

简单操作参考———打开主页上篇博客

https://blog.csdn.net/weixin_45810161/article/details/135906602?spm=1001.2014.3001.5501

在使用RabbitMQ的时候,怎么保证保证消息不丢失,RabbitMQ提供了两种不同的方式来控制消息的可靠性投递
1.confirm模式,生产者发送到交换机
2.return模式,交换机发送到队列

2.搭建生产者项目

2.1添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rabbitmq</groupId>
    <artifactId>springboot-rabbitmq-demo01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

2.2配置文件

配置文件开启confirm老版本可能不一样,如图所示,配置为true

在这里插入图片描述

server:
  port: 19991
spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    host: 192.168.3.123
    port: 5672
    virtual-host: /mqname1
    username: admin
    password: admin
    #开启confirm模式
    publisher-confirm-type: correlated

2.3新建启动类

package com.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: albc
 * @Date: 2024/01/30/10:06
 * @Description: good good study,day day up
 */
@SpringBootApplication
public class RabbitMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMqApplication.class, args);
        System.out.println("启动成功");
    }
}

2.4新建配置类

package com.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Rabbitmq配置类
 * @Author: albc
 * @Date: 2024/01/30/11:09
 * @Description: good good study,day day up
 */
@Configuration
public class RabbitMqConfig {

    //创建队列
    @Bean
    public Queue createqueue(){
        return new Queue("springboot_queue");
    }

    //创建交换机
    @Bean
    public DirectExchange createExchange(){
        return new DirectExchange("springboot_exchange");
    }

    //创建绑定
    @Bean
    public Binding createBinding(){
        return BindingBuilder.bind(createqueue()).to(createExchange()).with("user.insert");
    }
}

2.5新建回调函数

package com.rabbitmq.confirm;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

/**
 * 发送交换机回调函数
 * @Author: albc
 * @Date: 2024/01/30/11:55
 * @Description: good good study,day day up
 */
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

    /**
     *
     * @param correlationData 发送消息信息
     * @param ack  确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息
     * @param cause  如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null
     */
    @Override
    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
        if(ack){
            System.out.println("发送消息到交换机成功:"+cause);
        }else{
            System.out.println("发送消息到交换机失败,原因是:"+cause);
        }
    }

}

2.6测试

备注:如果发送失败,查看是否创建原来的队列导致的,进入rabbitmq客户端,删除交换机和队列
在这里插入图片描述
在这里插入图片描述

package com.rabbitmq.controller;

import com.rabbitmq.confirm.MyConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: albc
 * @Date: 2024/01/30/10:14
 * @Description: good good study,day day up
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MyConfirmCallback myConfirmCallback;

    @RequestMapping("/test1")
    public String test1(){
        //设置回调函数
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        //发送消息
        rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert消息发送");
        System.out.println("测试");
        return "发送成功";
    }

}

项目结构如下
在这里插入图片描述

发送消息
在这里插入图片描述
发送成功返回null
在这里插入图片描述
消费者收到消息

在这里插入图片描述
当发送一个不存在的交换机时
在这里插入图片描述
返回失败
在这里插入图片描述
消费者未收到消息
在这里插入图片描述

3.搭建消费者服务

3.1添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rabbitmq</groupId>
    <artifactId>springboot-rabbitmq-demo02</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

3.2 创建启动类

package com.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 消费者消费消息
 * @Author: albc
 * @Date: 2024/01/30/13:41
 * @Description: good good study,day day up
 */
@SpringBootApplication
public class ConsumberApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumberApplication.class, args);
        System.out.println("启动成功");
    }
}

3.3 创建监听类

package com.rabbitmq.common;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }
}

3.4配置文件

server:
  port: 19992
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 192.168.3.123
    port: 5672
    virtual-host: /mqname1
    username: admin
    password: admin

4.开启return模式

上面已经开启了confirm模式,可以保证消息发送交换机,但是如果交换机发送成功,消息发送队列的时候发送错误,消息还是会丢,现在需要开启return模式

4.1配置文件开启return模式

    #开启return模式
    publisher-returns: true

在这里插入图片描述

4.2 设置回调函数

发送消息正常不会回调

package com.rabbitmq.returns;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 开启return模式回调函数
 * @Author: albc
 * @Date: 2024/01/30/14:31
 * @Description: good good study,day day up
 */
@Component
public class MyReturnsCallback implements RabbitTemplate.ReturnCallback {


    /**
     *
     * @param message 退回的消息信息
     * @param replyCode 退回的状态码,对应消息信息
     * @param replyText 退回的信息
     * @param exchange 交换机
     * @param routingKey 路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("开启Return模式退回的消息是:"+new String(message.getBody()));
        System.out.println("开启Return模式退回的replyCode是:"+replyCode);
        System.out.println("开启Return模式退回的replyText是:"+replyText);
        System.out.println("开启Return模式退回的exchange是:"+exchange);
        System.out.println("开启Return模式退回的routingKey是:"+routingKey);
    }



}

4.3测试消息发送

package com.rabbitmq.controller;

import com.rabbitmq.confirm.MyConfirmCallback;
import com.rabbitmq.returns.MyReturnsCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: albc
 * @Date: 2024/01/30/10:14
 * @Description: good good study,day day up
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MyConfirmCallback myConfirmCallback;

    @Autowired
    private MyReturnsCallback myReturnCallback;

    @RequestMapping("/test1")
    public String test1(){
        //设置回调函数
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        //发送消息
        rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert消息发送");
        System.out.println("测试");
        return "发送成功";
    }

    @RequestMapping("/test2")
    public String test2(){
        //设置回调函数
        //rabbitTemplate.setConfirmCallback(myConfirmCallback);
        rabbitTemplate.setReturnCallback(myReturnCallback);
        //发送消息
        rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert-Returns消息发送");
        System.out.println("测试");
        return "发送成功";
    }


}

在这里插入图片描述

发送成功
在这里插入图片描述
消费者收到消息

在这里插入图片描述

测试发送不存在的队列

在这里插入图片描述

回调报错

在这里插入图片描述
同时开启confirm模式,return模式

在这里插入图片描述
发送交换机成功,发送到队列失败
在这里插入图片描述

正常消息发送,修改成正确的交换机和队列
在这里插入图片描述

发送成功
在这里插入图片描述

消费者收到消息

在这里插入图片描述

5.手动确认消息ACK

前面实现了发送的消息不丢失,但是消费者可能会丢消息,例如消息者没有接受消息,或者消费者收到消息后出现异常,这种情况下消息会丢失,RabbitMQ提供了一种进行手动确认的消息,ACK机制
ACK有三种模式
自动确认
当消息被消费者收到以后,会自动确认收到消息,消息会从队列中移除,实际业务中,消息可能收到了,但是业务执行过程中出现了异常,这种情况消息就会丢失

手动确认
当收到消息以后,需要手动确认消息收到,调用channel.basicAck手动签收,如果出现异常,调用channel.basicNack按照功能业务处理,例如:重新发送,拒绝签收进入死信队列等

根据异常情况来确认(基本不用)
RabbitMQ判断异常肯定不如我们判断的异常靠谱,基本不用

5.1开启ack模式

修改配置文件

消费者配置文件
不是生产者开启
在这里插入图片描述

    #开启手动确认
    listener:
      simple:
        acknowledge-mode: manual

生产者发送消息测试

http://127.0.0.1:19991/test/test2

测试接受消息

在这里插入图片描述

消费以后发现消息还在,重启以后又再次收到了消息
在这里插入图片描述

在这里插入图片描述

5.2手动确认消息

修改监听类RabbitMqConsumber

第一种:签收
channel.basicAck()
第二种:拒绝签收 批量处理
channel.basicNack()
第三种:拒绝签收 不批量处理
channel.basicReject()

自动确认代码修改前

package com.rabbitmq.common;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }
}

修改后,手动确认消息

package com.rabbitmq.common;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(Message message, Channel channel, String msg){
        System.out.println("消费者收到了消息:"+msg);
        //消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            /**
             * 手动确认消息
             * 1.签收消息的编号
             * 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
             * 一般不用
             */
            channel.basicAck(messageProperties.getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

当前消息还在

在这里插入图片描述

重启后消费,已经确认消息不在了
在这里插入图片描述

5.3异常处理

package com.rabbitmq.common;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(Message message, Channel channel, String msg){
        System.out.println("消费者收到了消息:"+msg);
        //消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            int i = 1/0;
            /**
             * 手动确认消息
             * 1.签收消息的编号
             * 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
             * 一般不用
             */
            channel.basicAck(messageProperties.getDeliveryTag(),false);
        } catch (Exception e) {
            System.out.println("进入异常方法.............");
            //保存数据库等
            try {
                /**
                 * 1.消息的编号
                 * 2.是否将消息放回到队列,false:不放回,true:放回队列
                 */
                channel.basicReject(messageProperties.getDeliveryTag(),false);
            } catch (IOException e1) {
                e1.printStackTrace();
            }


        }
    }
}

在这里插入图片描述
此时没有数据

在这里插入图片描述
发送测试
在这里插入图片描述

进入异常方法
在这里插入图片描述

因为没有放回队列,队列无数据
在这里插入图片描述

修改放回队列

在这里插入图片描述

会出现一直报错一直放回队列

在这里插入图片描述

放回队列代码

package com.rabbitmq.common;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(Message message, Channel channel, String msg){
        System.out.println("消费者收到了消息:"+msg);
        //消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            int i = 1/0;
            /**
             * 手动确认消息
             * 1.签收消息的编号
             * 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
             * 一般不用
             */
            channel.basicAck(messageProperties.getDeliveryTag(),false);
        } catch (Exception e) {
            System.out.println("进入异常方法.............");
            //保存数据库等
            try {
                /**
                 * 1.消息的编号
                 * 2.是否将消息放回到队列,false:不放回,true:放回队列
                 */
                channel.basicReject(messageProperties.getDeliveryTag(),true);
            } catch (IOException e1) {
                e1.printStackTrace();
            }


        }
    }
}

批量处理

package com.rabbitmq.common;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者
 * @Author: albc
 * @Date: 2024/01/30/13:45
 * @Description: good good study,day day up
 */
@Component
public class RabbitMqConsumber {

    /**
     * 消费者监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(Message message, Channel channel, String msg){
        System.out.println("消费者收到了消息:"+msg);
        //消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            int i = 1/0;
            /**
             * 手动确认消息
             * 1.签收消息的编号
             * 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
             * 一般不用
             */
            channel.basicAck(messageProperties.getDeliveryTag(),false);
        } catch (Exception e) {
            System.out.println("进入异常方法.............");
            //保存数据库等
            try {
                /**
                 * 1.消息的编号
                 * 2.是否将消息放回到队列,false:不放回,true:放回队列
                 */
                //channel.basicReject(messageProperties.getDeliveryTag(),true);
                /**
                 * 批量处理
                 * 1.消息的编号
                 * 2.是否批量处理
                 * 3.是否放回到队列
                 */
                channel.basicNack(messageProperties.getDeliveryTag(),false,false);
            } catch (IOException e1) {
                e1.printStackTrace();
            }


        }
    }
}

在这里插入图片描述

原文地址:https://blog.csdn.net/weixin_45810161/article/details/135931632

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_63379.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注