本文介绍: MQ 全称为,即消息队列。“消息队列” 是在消息传输过程保存消息容器。它是典型的:生产者消费者模型生产者不断向消息列中生产消息消费者不断的从队列中取消息。因为消息的生产消费都是异步的,而且只关心消息的发送接收没有业务逻辑的侵入,这样就实现生产者消费者的解耦。消息,两台计算机间传送的数据单位可以非常简单,也可以复杂。队列,数据结构概念。在队列中数据先进先出,后进后出。


MQ 简介

1、简介

MQ 全称为 Message Queue,即消息队列。“消息队列” 是在消息的传输过程保存消息的容器
它是典型的:生产者消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中取消息。因为消息的生产消费都是异步的,而且只关心消息的发送接收没有业务逻辑的侵入,这样就实现生产者消费者的解耦

消息,两台计算机间传送的数据单位。可以非常简单,也可以复杂

队列,数据结构概念。在队列中数据先进先出,后进后出。

2、MQ优缺点

优点

缺点

3、MQ应用场景

应用解耦

电商平台中,用户订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题

如果在系统中引入 MQ,即订单系统将消息先发送到 MQ 中,MQ 再转发到其他系统,则会解决以下问题

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长为920;如果引入 MQ,则用户下单等待时长为25
在这里插入图片描述
削峰限流

假设我们的系统每秒只能承载 1000 请求,如果请求瞬间增多到每秒 5000,则会造成系统崩溃。此时引入 MQ 即可解决问题
在这里插入图片描述
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被 “削” 掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做限流。

4、AMQP 和 JMS

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务应用层标准 高级消息队列协议二进制应用层协议),是应用协议的一个开放标准,为面向消息的中间件设计基于协议客户端消息中间件传递消息,并不受中间件产品开发语言条件限制。类比 HTTP 协议

JMS,即 Java Message Service,是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输JMS API 是一个消息服务标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建发送接收和读取消息。一种规范,和 JDBC、Jedis 担任的角色类似。

区别

5、常见的 MQ 产品

ActiveMQ RabbitMQ RocketMQ Kafka
开发语言 Java Erlang Java Scala & Java
协议支持 AMQP,REST,XMPP,STOMP AMQP,XMPP,SMTP,STOMP 自定义 自定义协议,社区封装了HTTP协议支持
客户端支持语言 Java,C,C++,Python,PHP等 官方支持 Erlang,Java等,社区产出多种 API,几乎支持所有语言 Java,C++ 官方支持 Java,社区产出多种 API,如 PHP,Python
单机吞吐量 万级 万级 十万级 十万级
消息延迟 毫秒 微秒级 毫秒 毫秒以内
可用 主从架构 镜像集群模式 分布式架构 分布式架构
消息可靠性 较低概率丢失数据 丢失数据 保证数据不丢失
功能特性 老牌产品,成熟度高,文档较丰富 并发能力强,性能极其好,支持一些消息中间件高级功能延时低,社区活跃,管理界面较为丰富 MQ性能比较完备,扩展性强,支持大量的消息中间件高级功能 只支持主要的 MQ 功能接收与发送),主要应用于大数据领域

RabbitMQ 工作原理

RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

官网地址https://www.rabbitmq.com/which-erlang.html
在这里插入图片描述

rabbitMQ 交换机类型有4种:

直连交换机dirext exchange)为 RabbitMQ 默认交换机

拓展:RabbitMQ 为什么使用信道而不直接使用 TCP 连接通信

TCP 连接的创建销毁开销特别大。创建需要 3 次握手,销毁需要 4 次挥手。高峰时每秒成千上万条 TCP 连接的创建会造成资源巨大的浪费。而且操作系统每秒处理 TCP 连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

Linux 环境安装 RabbitMQ

1、rmp安装

1.1 安装

# 安装erlang所需要的依赖
yum install -y epel-release

# 安装 Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

# 查看Erlang 是否安装成功
erl -version

# 安装 socat
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

# 安装 RabbitMQ
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

1.2 开启管理界面

rabbitmq-plugins enable rabbitmq_management

1.3 启动停止

service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

# 如果这里访问不到: (http://ip:15672/ )关闭防火墙,并重新启动guest / guest
service iptables stop

1.4 创建新用户

# 创建账户
rabbitmqctl add_user admin 123
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 查看所有用户
rabbitmqctl list_users

2、docker安装法

2.1 安装

docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

2.2 下载rabbitmq_delayed_message_exchange插件

官网下载地址:https://www.rabbitmq.com/community-plugins.html

注意:具体下载哪个版本插件,可进入容器内部查看。一定要大于等于当前版本。

[root@iZ2zeffygi8nlek3pfjco8Z ~]# docker exec -it e91 /bin/bash
root@e91d8abdddb4:/# rabbitmq-plugins list
# 将插件拷贝docker容器
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez e91:/opt/rabbitmq/plugins

# 开启管理界面
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

执行命令之后,重新启动 RabbitMQ 容器然后登录RabbitMQ 的Web端界面查看插件是否启动成功
在这里插入图片描述

以下即为管控台界面
在这里插入图片描述

RabbitMQ 工作模式

RabbitMQ 共有六种工作模式:

1、简单模式(Hello Wold)

在这里插入图片描述
特点:

1.1 引入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
</dependency>

1.2 生产代码

package com.sea.rabbitmq.queue.simplequeue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单模式生产者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Producer {

    //队列名
    public static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.94.151.26");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        //2、创建连接
        Connection connection = factory.newConnection();
        //3、建立信道
        Channel channel = connection.createChannel();
        //4、创建队列,如果队列存在,则直接使用
        /**
         * 方法参数:
         * 参数1: 队列名称
         * 参数2: 是否持久化。true表示队列会保存磁盘,MQ重启后队列还在
         * 参数3: 是否私有化true表示只有第一次拥有它的消费者才能访问false表示所有消费者都能访问
         * 参数4: 是否自动删除true表示不再使用队列时自动删除队列
         * 参数5: 队列其他参数。如:x-message-ttl,x-expires等
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //5、发送消息
        String message = "Hello rabbitmq";
        /**
         * 方法参数:
         * 参数1: 交换机名,""表示默认交换机
         * 参数2: 路由key简单模式为队列名
         * 参数3: 消息的其他属性,如路由头等
         * 参数4: 消息体(字节数组格式)
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        //6、关闭信道和连接
        channel.close();
        connection.close();
        System.out.println("生产消息发送成功。");
    }

}

运行生产者后,我们可以看到在 RabbitMQ 中创建了队列,队列中已经有了消息,具体详情如下
在这里插入图片描述
1.3 消费者代码

package com.sea.rabbitmq.queue.simplequeue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单模式消费者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Consumer {

    //队列名
    public static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.94.151.26");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        //2、创建连接
        Connection connection = factory.newConnection();
        //3、建立信道
        Channel channel = connection.createChannel();
        //4、监听队列
        /**
         * 方法参数:
         * 参数1: 监听的队列名
         * 参数2: 是否自动签收。如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3: Consumer实现类,重写该类方法表示接收到消息后如何消费
         */
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("接收到消息为:" + message);
            }
        });

    }
}

2、工作队列模式(Work Queue)

在这里插入图片描述
简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用 direct 交换机应用于处理消息较多的情况。特点如下

2.1 封装工具

由于连接 RabbitMQ 的操作都一样,所以这里我们代码取出来进行封装

注意没有关闭资源连接。

package com.sea.rabbitmq.queue.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Rabbitmq连接工具类
 *
 * @author sea
 * @date 2023-12-01
 */
public class RabbitMqUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.94.151.26");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = factory.newConnection();
        // 3.建立信道
        return connection.createChannel();
    }
}

2.2 生产者代码

package com.sea.rabbitmq.queue.workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列生产者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Producer {

    //队列名称
    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、通过工具类创建连接
        Channel channel = RabbitMqUtils.getChannel();
        //2、创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //3、发送批量消息,参数3表示该消息为持久化消息,即除了保存内存还会保存磁盘
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
                    ("发送成功,这是第 " + i + " 条消息").getBytes());
        }
        //4、关闭资源
        channel.close();
    }
}

2.3 消费者代码
这里我们编写两个消费者去消费生产出来的消息,两个消费者的代码大体一致,不同的是输出内容,所以此处展示消费者01的代码

package com.sea.rabbitmq.queue.workqueue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列消费者1
 *
 * @author sea
 * @date 2023-12-01
 */
public class Consumer1 {

    //队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者01消费成功,内容为: " + message);
            }
        });
    }
}

2.4 结果
消费者01消费结果如下
在这里插入图片描述
消费者02消费结果如下
在这里插入图片描述

3、发布订阅模式(Publish/Subscribe)

在这里插入图片描述
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),特点如下

3.1 生产者代码

package com.sea.rabbitmq.queue.publishqueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发布订阅模式生产者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Producer {

    //交换机名称
    public static final String EXCHANGE_NAME = "fanout_exchange";
    //队列名称,分别为:邮件队列、信息队列、站内队列
    public static final String EMAIL_QUEUE_NAME = "send_email_queue";
    private static final String MESSAGE_QUEUE_NAME = "send_message_queue";
    private static final String STATION_QUEUE_NAME = "send_station_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //创建fanout交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机是否持久化
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);
        //交换机绑定队列
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "");
        channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "");
        channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "");
        //发送消息
        String message = "京东618活动马上就要开始啦! 欢迎您登录京东参与。";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        //关闭资源
        channel.close();
    }
}

3.1 消费者代码

这里我们编写三个消费者,分别为短信消费者、邮件消费者、站内信消费者,三者的代码大体都一致,不同的是监听对列名以及输出内容,所以此处只展示短信消费者的代码:

package com.sea.rabbitmq.queue.publishqueue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发布订阅模式:短信消费者
 *
 * @author sea
 * @date 2023-12-01
 */
public class MessageConsumer {

    private static final String MESSAGE_QUEUE_NAME = "send_message_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.basicConsume(MESSAGE_QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("发送短信成功,短信内容为: " + message);
            }
        });
    }
}

注意:也可以使用 工作队列 + 发布订阅 模式同时使用,即两个消费者同时监听一个队列

4、路由模式(Routing)

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,618大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用 路由模式 (Routing) 完成这一需求。特点如下

3.1 生产者代码

由于此模式是在发布订阅模式基础上新增了路由绑定规则,进而实现将消息发送给指定队列的功能,所以代码与上面代码大致相同,不过添加了路由key,具体代码如下

package com.sea.rabbitmq.queue.routingqueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 路由模式 生产者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Producer {
    //交换机名称
    private static final String EXCHANGE_NAME = "routing_exchange";
    //队列名称,分别为: 邮箱队列、信息队列、站内信队列
    private static final String EMAIL_QUEUE_NAME = "send_email_queue";
    private static final String MESSAGE_QUEUE_NAME = "send_message_queue";
    private static final String STATION_QUEUE_NAME = "send_station_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //创建direct交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);
        //交换机绑定队列(重要的、普通的)
        channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "important");
        channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "important");
        channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "important");
        channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "common");
        //发送消息(重要消息全部发送,否则只发送站内消息)
        String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与";
        String commonMessage = "京东满减优惠开始啦,诚邀您参与";
        channel.basicPublish(EXCHANGE_NAME, "important", null, importMessage.getBytes());
        channel.basicPublish(EXCHANGE_NAME, "common", null, commonMessage.getBytes());
        //关闭资源
        channel.close();
    }
}

3.1 消费者代码

消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。

下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:
在这里插入图片描述

5、通配符模式(Topics)

在这里插入图片描述
通配符模式(Topics)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的 RoutingKey 能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,通配符模式使用 topic 交换机。 能按照通配符规则将消息发送给指定队列。

通配符规则如下

3.1 生产者代码

由于此模式是在路由模式基础上给队列绑定了带通配符的路由关键字,进而实现将消息按照通配符规则发送给指定队列,所以代码与上面代码大致相同,具体代码如下:

package com.sea.rabbitmq.queue.topicsqueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 通配符模式 生产者
 *
 * @author sea
 * @date 2023-12-01
 */
public class Producer {

    //交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";
    //队列名称,分别为: 邮箱队列、信息队列、站内信队列
    private static final String EMAIL_QUEUE_NAME = "send_email_queue";
    private static final String MESSAGE_QUEUE_NAME = "send_message_queue";
    private static final String STATION_QUEUE_NAME = "send_station_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //创建direct交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);
        //交换机绑定队列
        channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "#.email.#");
        channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "#.message.#");
        channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "#.station.#");
        //发送消息
        String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与";
        String commonMessage = "京东满减优惠开始啦,诚邀您参与";
        //类似于模糊匹配
        channel.basicPublish(EXCHANGE_NAME, "email.message.station", null, importMessage.getBytes());
        channel.basicPublish(EXCHANGE_NAME, "station", null, commonMessage.getBytes());
        //关闭资源
        channel.close();
    }
}

3.1 消费者代码
消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。

下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:
在这里插入图片描述

6、远程调用模式(RPC, 不常用

在这里插入图片描述

Springboot 整合 RabbitMQ

1、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2、配置文件

server:
  port: 8081

spring:
  rabbitmq:
    host: 47.94.151.26
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3、配置

SpringBoot 整合 RabbitMQ 时,需要在配置类中创建队列和交换机。

package com.sea.rabbitmq.boot;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMq配置类
 *
 * @author sea
 * @date 2023-12-01
 */
@Configuration
public class RabbitMqConfig {

    private static final String TOPIC_EXCHANGE_NAME = "topicExchange";
    private static final String QUEUE_NAME = "bootQueue";

    /**
     * 创建topic类型交换机
     */
    @Bean("topicExchange")
    public Exchange getExchange() {
        return ExchangeBuilder
                //交换机类型
                .topicExchange(TOPIC_EXCHANGE_NAME)
                //是否持久
                .durable(true)
                .build();
    }

    /**
     * 创建队列
     */
    @Bean("bootQueue")
    public Queue getQueue() {
        return QueueBuilder
                //持久化队列
                .durable(QUEUE_NAME)
                .build();
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bingMessageQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                //路由规则
                .with("#.message.#")
                //没有其他参数
                .noargs();
    }
}

4、生产者
SpringBoot 整合 RabbitMQ 时,提供了工具类 RabbitTemplate 发送消息,编写生产者时只需要注入 RabbitTemplate 即可发送消息。

package com.sea.rabbitmq.boot;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

/**
 * 消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
public class Consumer {

    /**
     * 监听队列
     */
    @RabbitListener(queues = "bootQueue")
    public void listenMessage(String message) {
        System.out.println("整合boot,接收消息:" + message);
    }
}

5、消费者

此处需要编写另一个 SpringBoot 项目作为 RabbitMQ 的消费者,因为如果在一个项目中可以通过方法调用就行了,没有必要通过 RabbitMQ 来进行通信

搭建 RabbitMQ 的 SpringBoot 项目步骤前面相同这里直接展示消费者代码:

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * SpringBoot 整合 RabbitMQ 生产者
 */
@Test
public void testSendMessage() {
    /**
     * 发送消息
     * 1.exchange: 交换机
     * 2.routingKey: 路由key
     * 3.object: 发送的消息
     */
    rabbitTemplate.convertAndSend("topicExchange", "message", "快来参加京东618活动哦,整合boot。");
}

@RabbitListener 注解用来监听队列,放在具体的方法上面。

消息的可靠投递

RabbitMQ 消息的投递路径为:

​ 生产者 ——> 交换机 ——> 队列 ——> 消费者

在 RabbitMQ 工作的过程中,每个环节消息都有可能传递失败,RabbitMQ 可以通过以下三种模式来监听消息时候投递成功

  • 确认模式(Confirm):可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(Return):可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack):可以监听消费者是否成功处理消息。

1、确认模式

1.1 生产者模块配置文件

spring:
  rabbitmq:
  	# 开启确认模式
	publisher-confirm-type: correlated

spring.rabbitmq.publisherconfirmtype 属性有三个值:

CORRELATED 和 SIMPLE 的区别:

1.2 回调接口

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 确定模式的回调方法,消息向交换机发送后调用confirm()方法
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setConfirmCallback() {
        //将当前实现注入到rabbitTemplate的确认回调
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调接口
     *
     * @param correlationData 存储消息的ID和自己存储的关于该条消息的信息
     * @param ack             交换机是否接收成功
     * @param cause           异常原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = null != correlationData ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机接收到id为:{}的消息", id);
        } else {
            log.info("交换机未接收到id为:{}的消息,原因为{}", id, cause);
            //TODO 交换机未收到消息,可以进行对应的业务处理
        }
    }
}

1.3 控制

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author sea
 * @date 2023-12-01
 */
@RestController
@Slf4j
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/confirm/{msg}")
    public void sendMsgWithConfirm(@PathVariable String msg) {
        rabbitTemplate.convertAndSend("topicExchange", "message", msg, new CorrelationData());
        log.info("生产者发送消息:{}", msg);
    }
}

1.4 使用 Postman 进行测试

正常发送,控制台输出如下:
在这里插入图片描述
发送失败的情况,将交换机的名称改为存在的,控制台输出如下:
在这里插入图片描述

2、回退模式

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,此时通过开始消息回退消息传递过程中不可达目的地时将消息返回给生产者。

退回模式可以监听消息是否从交换机成功传递到队列,具体使用如下:

2.1 添加配置

spring:
  rabbitmq:
  	# 开启回退模式
	publisher-returns: true

2.2 回调接口

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回退模式的回调方法,交换机发送到队列失败后才会执行 returnedMessage()方法
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setReturnsCallback() {
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 消息回退接口,只有当消息无法传递到目的地时才进行回退
     *
     * @param returned 失败后将失败信息封装到参数中
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息 {}, 被交换机 {} 退回, 应答代码 {}, 原因 {}, 路由 {}",
                new String(returned.getMessage().getBody()),
                returned.getExchange(),
                returned.getReplyCode(),
                returned.getReplyText(),
                returned.getRoutingKey());
        //TODO 消息回退,可以进行对应的业务处理
    }
}

1.4 使用 Postman 进行测试

发送成功不会打印对应日志,只有当消息发送失败时才进行回退,才会打印对应日志

发送失败的情况,将路由的名称改为不存在的,控制输出如下:
在这里插入图片描述

3、消费者消费确认模式

在 RabbitMQ 中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列

这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。消息分为 自动确认手动确认

自动确认:spring.rabbitmq.listener.simple.acknowledge-mode = none
手动确认:spring.rabbitmq.listener.simple.acknowledge-mode = manual

注意:此模式的配置配置在消费端

3.1 消费者添加配置

spring:
  rabbitmq:
    listener:
      simple:
      	# 开启手动确认模式
        acknowledge-mode: manual

3.2 手动确认模式的消费者

package com.sea.rabbitmq.confirm;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**
 * 手动确认模式的消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
public class AckConsumer {

    @RabbitListener(queues = "bootQueue")
    public void listenMsg(Message message, Channel channel) throws IOException {
        //消息投递号,每次投递消息该值都会加1
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
//            int i = 1/0;
            System.out.println("成功接收到消息:" + message);
            /**
             * 手动签收消息
             * deliveryTag: 消息投递号
             * true: 是否可以一次签收多条消息
             */
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            System.out.println("消息接收失败!");
            /**
             * 拒签消息,设置消息重回队列中
             * deliveryTag: 消息投递号
             * true: 是否可以一次拒签多条消息
             * true: 拒签后消息是否重回队列,不放回消息则会放到死信队列
             */
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

注意:上面代码由于模拟运行异常,拒签后重新放回队列中,然后重新执行,所以会一直输出“消息接收失败!”

其他高级特性

1、消费端限流

假想我们 RabbtiMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间全部喷涌推动过来,但是单个客户端无法同时处理这么多条数据,就会被压垮崩溃

RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量服务质量保证功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认之前,不再进行消费新的消息。

我们可以通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

拓展:预取值(Prefetch
在 RabbitMQ 中,预取值(Prefetch Count)是指消费者从队列中预取的消息数量。当一个消费者连接到一个队列并开始消费消息时,它可以通过设置预取值控制一次从队列中获取的消息数量。预取值可以在消费者创建时进行设置,也可以在运行时进行更改

预取值的主要作用:是控制消费者的负载,避免一个消费者在处理消息时占用过多的资源,导致其他消费者无法获得足够的资源。通过限制每次预取的消息数量,可以控制消费者的处理速度,避免过度消费队列中的消息。

预取值的设置方式有两种:

  • 全局设置:通过 channel.basicQos(prefetchCount) 方法设置全局预取值。在这种情况下,所有的消费者都将使用相同的预取值。
  • 单独设置:通过 channel.basicConsume(queue, consumer) 方法的 prefetchCount 参数设置单独的预取值。在这种情况下,每个消费者都可以使用不同的预取值。

需要注意的是,预取值并不是绝对的,它只是一个提示值。当消费者处理完预取值数量的消息后,它可以继续从队列中获取更多的消息,不受预取值的限制。同时,当队列中的消息数量少于预取值时,消费者将无法获取更多的消息,直到队列中有新的消息可用。因此,预取值的设置应该根据实际情况进行调整,以保证消费者的负载均衡和队列的稳定

1.1 消费端配置

spring:
  rabbitmq:
    listener:
      simple:
        # 开启手动确认模式(消费端限流必须开启)
        acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消
        prefetch: 5

1.2 生产者发送多条消息

/**
 * 消费端限流、不公平分发 生产者
 */
@Test
public void testSendBatchMsg() {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("topicExchange", "message", i + " 快来参加京东618活动哦,消费端限流。");
    }
}

1.3 消费者监听队列

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**
 * 消费端限流 消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class LimitConsumer {

    @RabbitListener(queues = "bootQueue")
    public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        log.info("接收到的消息为: {}", msg);
        // 2.模拟业务处理
        Thread.sleep(3000);
        // 3.手动签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);
    }
}

1.4 消费过程中管控台如下
在这里插入图片描述

1.5 消费结果如下
在这里插入图片描述

2、不公平分发

在 RabbitMQ 中,多个消费者监听同一条队列,则队列默认采用轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1 处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1 有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

2.1 消费端配置

spring:
  rabbitmq:
    listener:
      simple:
        # 开启手动确认模式(消费端限流必须开启)
        acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息
        # prefetch: 5
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

2.2 生产者发送多条消息

与上面一致即可

2.3 添加两个效率不一样的消费者

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**
 * 不公平分发 消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class UnfairConsumer {

    @RabbitListener(queues = "bootQueue")
    public void listenMsgOne(Message message, Channel channel) throws InterruptedException, IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        log.info("消费者 1 接收到的消息为: {}", msg);
        // 2.模拟业务处理, 快
        Thread.sleep(500);
        // 3.手动签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);
    }

    @RabbitListener(queues = "bootQueue")
    public void listenMsgTwo(Message message, Channel channel) throws InterruptedException, IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        log.info("消费者 2 接收到的消息为: {}", msg);
        // 2.模拟业务处理, 慢
        Thread.sleep(3000);
        // 3.手动签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);
    }
}

2.4 消费端实现了不公平分发效果
在这里插入图片描述

3、消息存活时间

RabbitMQ 可以设置消息的存活时间(Time To Live,简称TTL),单位是毫秒,当消息到达存活时间后还没有被消费,会被移出队列。

RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。具体使用如下:

消息到达存活时间未被消费时,消息会被放入死信队列。

3.1 对队列的所有消息设置存活时间

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

/**
     * 创建有存活时间的消息队列
     */
    @Bean("ttlQueue")
    public Queue getTtlQueue() {
        return QueueBuilder
                .durable("ttlQueue")
                //10s消息过期
                .ttl(10000)
                .build();
    }

    /**
     * 绑定带有存活时间的队列
     */
    @Bean
    public Binding bingTtlQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("ttlQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("ttl")
                .noargs();
    }

3.2 生产者代码

/**
   * 消息存活时间 生产者
   */
  @Test
  public void testSendMsgWithTtl() {
      //设置消息属性
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setExpiration("10000");
      //创建消息对象
      Message message = new Message("测试发送带有过期时间的消息。".getBytes(StandardCharsets.UTF_8), messageProperties);
      //发送消息
      rabbitTemplate.convertAndSend("topicExchange", "ttl", message);
  }

3.3 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消息存活时间 消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class WithTtlConsumer {

    @RabbitListener(queues = "ttlQueue")
    public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        log.info("接收到的消息为: {}", msg);
        // 2.模拟业务处理
        Thread.sleep(2000);
        // 3.手动签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);
    }
}

3.4 测试结果
在这里插入图片描述
注意

  • 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  • 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

4、优先级队列

RabbitMQ 优先级队列(Priority Queue)是一种特殊的队列,它根据消息的优先级将其放置在队列中。当消费者从队列中获取消息时,它将按照优先级从高到低的顺序获取消息。优先级队列可以用于处理一些需要按照优先级处理的消息,例如日志记录任务调度等。具体使用如下:

4.1 对队列的所有消息设置存活时间

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

/**
     * 创建有优先级的消息队列
     */
    @Bean("priorityQueue")
    public Queue getPriorityQueue() {
        return QueueBuilder
                .durable("priorityQueue")
                //设置队列的最大优先级最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
                .maxPriority(10)
                .build();
    }

    /**
     * 绑定带有优先级的队列
     */
    @Bean
    public Binding bingPriorityQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("priorityQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("priority")
                .noargs();
    }

4.2 生产者代码

/**
 * 优先级队列 生产者
   */
  @Test
  public void testPriority() {
      for (int i = 0; i < 50; i++) {
          //i为5的倍数优先级较高
          if (i % 5 == 0) {
              MessageProperties messageProperties = new MessageProperties();
              //设置优先
              messageProperties.setPriority(9);
              //创建消息对象
              Message message = new Message((i + " 测试发送带有优先级的消息。").getBytes(StandardCharsets.UTF_8), messageProperties);
              //发送消息
              rabbitTemplate.convertAndSend("topicExchange", "priority", message);
          } else {
              rabbitTemplate.convertAndSend("topicExchange", "priority", i + " 普通级别的消息");
          }
      }
  }

4.2 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**
 * 优先级队列 消费者
 *
 * @author sea
 * @date 2023-12-01
 */
@Component
@Slf4j
public class PriorityConsumer {
    @RabbitListener(queues = "priorityQueue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        String msg = new String(message.getBody());
        log.info("接收到的消息为: {}", msg);
        Thread.sleep(2000);
        // 2.手动签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);
    }
}

4.4 测试结果
在这里插入图片描述

死信队列

在 MQ 中,当消息在队列中由于某些原因没有被及时消费而变成死信(Dead Message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在 RabbitMQ 中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。

死信交换机 和 死信队列与普通的没有区别。死信队列只是一种特殊的队列,里面的消息仍然可以消费。

消息成为死信的情况:

  • 队列消息长度到达限制
  • 消费者拒签消息,并且不把消息重新放入原队列
  • 消息到达存活时间未被消费

实现流程如下以及具体代码如下:
在这里插入图片描述
1. 创建普通队列和死信队列配置

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

public static final String DEAD_EXCHANGE_NAME = "deadExchange";
public static finalString DEAD_QUEUE_NAME = "deadQueue";

public static final String NORMAL_EXCHANGE_NAME = "normalExchange";
public static final String NORMAL_QUEUE_NAME = "normalQueue";

/**
     * 创建死信交换机,与普通交换机一样
     */
    @Bean("deadExchange")
    public Exchange getDeadExchange() {
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    /**
     * 创建死信队列,与普通队列一样
     */
    @Bean("deadQueue")
    public Queue getDeadQueue() {
        return QueueBuilder
                .durable(DEAD_QUEUE_NAME)
                .build();
    }

    /**
     * 死信交换机绑定死信队列
     */
    @Bean
    public Binding bingDeadQueue(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dead_route")
                .noargs();
    }

    /**
     * 创建普通交换机
     */
    @Bean("normalExchange")
    public Exchange getNormalExchange() {
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    /**
     * 创建普通队列
     */
    @Bean("normalQueue")
    public Queue getNormalQueue() {
        return QueueBuilder
                .durable(NORMAL_QUEUE_NAME)
                // 绑定死信交换机
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                // 死信队列路由关键字
                .deadLetterRoutingKey("dead_route")
                // 消息存活10s(此队列消息过期会变成死信)
                .ttl(10000)
                // 队列最大长度为10(此队列的长度大于10会变成死信)
                .maxLength(10)
                .build();
    }

    /**
     * 普通交换机绑定普通队列
     */
    @Bean
    public Binding bingNormalQueue(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("normal_route")
                .noargs();
    }

2. 生产者代码

/**
 * 测试死信队列
 */
@Test
public void testDLX(){
    // 1.存活时间过期后变成死信
    rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消息过期,消息会成为死信");
    // 2.超过队列长度后变成死信
    for (int i = 0; i < 15; i++) {
        rabbitTemplate.convertAndSend("normalExchange","normal_route","测试超过队列长度,消息会成为死信");
    }
    // 3.消息拒签但不返回原队列后变成死信
    rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消费者拒签并且不放回队列,消息会成为死信");
}

3. 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;
import java.util.Date;

/**
 * 死信队列 消费者
 *
 * @author sea
 * @date 2023-12-04
 */
@Component
@Slf4j
public class DeadConsumer {

    @RabbitListener(queues ="normalQueue")
    public void listenMessage(Message message, Channel channel) throws IOException {
        // 拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
    }

    @RabbitListener(queues = "deadQueue")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);
    }
}

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

但 RabbitMQ 中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果

延迟队列:TTL + 死信队列的合体

1. 创建延迟配置

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

public static final String DELAYED_EXCHANGE = "delayed_exchange";
    public static final String DELAYED_QUEUE = "delayed_queue";
    public static final String DELAYED_ROUTE_KEY = "delayed_route";

    /**
     * 创建延迟交换机
     */
    @Bean("delayed_exchange")
    public Exchange getDelayedExchange() {
        // 创建自定义交换机
        Map<String, Object> args = new HashMap<>(1);
        // topic类型的延迟交换机
        args.put("x-delayed-type", "topic");
        /**
         * 参数1: 交换机名称
         * 参数2: 交换机类型(x-delayed-message代表延迟交换机)
         * 参数3: 是否持久化
         * 参数4: 是否自动删除
         * 参数5: 额外参数
         */
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 创建延迟队列
     */
    @Bean("delayed_queue")
    public Queue getDelayedQueue() {
        return QueueBuilder
                .durable(DELAYED_QUEUE)
                .build();
    }

    /**
     * 绑定延迟队列
     */
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayed_queue") Queue queue, @Qualifier("delayed_exchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DELAYED_ROUTE_KEY)
                .noargs();
    }

2. 生产者代码

package com.sea.rabbitmq.advanced;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 延迟队列 生产者
 *
 * @author sea
 * @date 2023-12-04
 */
@RestController
@Slf4j
public class DelayedController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendDelayed")
    public void sendDelayed(String message, Integer delayedTime) {
        log.info("当前时间:{},发送一条过期时间{}信息给delayed交换机:{}", new Date(), delayedTime, message);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(delayedTime);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route", message, messagePostProcessor);

        //另一种写法
//        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route", message,
//                msg -> {
//                    //设置延迟时间
//                    msg.getMessageProperties().setDelay(delayedTime);
//                    return msg;
//                });
    }

}

3. 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.util.Date;

/**
 * 延迟队列 消费者
 *
 * @author sea
 * @date 2023-12-04
 */
@Component
@Slf4j
public class DelayedConsumer {

    @RabbitListener(queues = "delayed_queue")
    public void receive(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间{},收到延时交换机的消息:{}", new Date(), msg);
    }
}

如果有收获! 希望老铁们来个三连,点赞收藏转发

创作不易,别忘点个赞,可以让更多的人看到篇文章,顺便鼓励我写出更好博客

原文地址:https://blog.csdn.net/weixin_45606067/article/details/134734794

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_47364.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注