前言

死信交换机、延迟消息


一、死信交换

二、延迟消息

上面讲诉的两种作用可以用来实现消费者重试处理,即将处理失败溢出的消息放在特定队列由人工处理,与消费者重试时讲的RepublishMessageRecoverer作用类似(在RabbitMQ之消费者的可靠性里讲过RepublishMessageRecoverer):

收集因TTL(有效期)到期的消息这个作用可以用来实现延迟消息

死信交换机实现延迟消息

图解流程

在这里插入图片描述

注意:

DelayExchange插件实现延迟消息

安装插件

插件下载地址

在这里插入图片描述

下载文件放到了/mnt目录下,然后输入sudo docker ps命令查看自己rabbitmq是否正在运行,如果不在运行输入sudo docker start id这里填你自己容器id,如果不知道自己id的,输入sudo docker pa -a查看。

在这里插入图片描述

当容器运行起来后,输入sudo docker cp /mnt/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins命令,将刚插件拷贝到容器内plugins目录下。

在这里插入图片描述

拷贝完成后,输入sudo docker exec -it rabbit /bin/bash命令进入容器。
进入plugins文件夹

在这里插入图片描述

在容器内plugins目录下,查看插件是否上传成功ls -l|grep delay

在这里插入图片描述

然后启动插件,在当前目录输入rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令

在这里插入图片描述

这里插件安装完成了,接下来我们需要重启RabbitMQ容器。执行exit命令退出RabbitMQ容器内部然后执行docker restart 容器名命令重启RabbitMQ容器

声明延迟交换机

两种方式,自行选择
基于注解

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于bean

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:
延迟消息插件内部维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。


总结

以上就是延迟消息的详细讲解了。

原文地址:https://blog.csdn.net/weixin_62951900/article/details/134637473

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_40344.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注