本文介绍: 前面我们介绍了RabbitMQ的安装、各大消息中间件的对比、AMQP核心概念、管控台的使用快速入门RabbitMQ。本章将介绍RabbitMQ的高级特性。分两篇(上/下)进行介绍。消息如何保障100%的投递成功?幂等性概念详解海量订单产生的业务高峰期,如何避免消息重复消费的问题?Confirm确认消息、Return返回消息保障消息的成功发出保障MQ节点的成功接收发送收到MQ节点(Broker确认应答完善的消息进行补偿机制前三步不一定能保障消息能够100%投递成功。

前言

前面我们介绍了RabbitMQ的安装、各大消息中间件的对比、AMQP核心概念、管控台的使用快速入门RabbitMQ。本章将介绍RabbitMQ的高级特性。分两篇(上/下)进行介绍。

1 消息如何保障100%的投递成功?

1.1 什么生产端的可靠性投递

前三步不一定能保障消息能够100%投递成功。因此要加上第四步

BAT/TMD 互联网大厂解决方案
– 消息落库,对消息状态进行打标
发送消息的时候需要将消息持久化到数据库中,并给这个消息设置一个状态(未发送发送中、到达)。当消息状态发生了变化,需要对消息做一个变更。针对没有到达的消息做一个轮训操作,重新发送。对轮训次数也需要做一个限制3-5次。确保消息能够成功的发送.

具体采用哪种方案,还需要根据业务与消息的并发量而定。

1.2 第一种方案

生产端-可靠性投递

图解:

蓝色部分表示生产者负责发送消息发送至Broker
Biz DB:订单数据库 MSG DB: 消息数据
面对小规模的应用可以采用加事务方式,保证事务一致性。但在大厂中面对高并发,并没有事务,事务的性能拼接非常严重,而是做补偿。

比如如下一条订单消息。

step1:存储订单消息(创建订单),业务数据入库,消息也入库。缺点:需要持久两次。(status:0)
step2:在step1成功的前提下,发送消息
step3:Broker收到消息后,confirm我们生产端。Confirm Listener异步监听Broker回送的消息。
step4:抓取指定的消息,更新status=1),表示消息已经投递成功。

step5:分布式定时任务获取消息状态,如果等于0则抓取数据出来。
step6:重新发送消息
step7:重试限制设置3次。如果消息重试了3次还是失败,那么(status=2),认为这个消息就是失败的。

查询这些消息为什么失败可能需要人工去查询

假设step2执行成功,step3由于网络闪断。那么confirm将永远收不到消息,那么我们需要设定一个规则
例如:在消息入库的时候设置一个临界值 timeout=5min,当超过5min之后,就将这条数抓取出来。
或者写一个定时任务每隔5分钟就将status=0的消息抓取出来。可能存在问题:消息发送出去,定时任务又正好刚执行,Confirm还未收到定时任务就会执行,会导致消息执行两次
更精细化操作:消息超时容忍限制confirm在2-3分钟内未收到消息,则重新发送。


第一种方案数据两次入库,一次业务数据入库,一次消息入库。这样对数据的入库是一个瓶颈。
其实我们只需要对业务进行入库。

这种方式并不一定能保证100%成功,但是也能保证99.99%的消息成功。如果遇到特别极端的情况,那么就只能需要人工去补偿,或者定时任务去做。
第二种方式主要是为了减少对数据库操作

看下第二种方式

图解:

Upstream service:生产端
DownStream service:消费端
Callback service:回调服务

step1:业务消息入库成功后,第一次消息发送。
step2:同样在消息入库成功后,发送第二次消息,这两条消息是同时发送的。第二条消息是延迟检查可以设置2min、5min 延迟发送。
step3:消费端监听指定队列
step4:消费端处理完消息后,内部生成新的消息send confirm。投递到MQ Broker。
step5: Callback Service 回调服务监听MQ Broker,如果收到Downstream service发送的消息,则可以确定消息发送成功,执行消息存储到MSG DB。
step6:Check Detail检查监听step2延迟投递的消息。此时两个监听的队列不是同一个,5分钟后,Callback service收到消息,检查MSG DB。如果发现之前的消息已经投递成功,则不需要做其他事情。如果检查发现失败,则Callback 进行补偿,主动发送RPC 通信通知上游生产端重新发送消息。

这样做的目的:少做了一次DB存储关注点并不是百分百的投递成功,而是性能

2. 幂等性概念

2.1 幂等性是什么

幂等(idempotent、idempotence)是一个数学与计算机概念常见抽象数中,即f(f(x)) = f(x)。简单的来说就是一个操作多次执行产生的结果一次执行产生的结果一致

利用版本号Version方式来保证幂等性。

2.2 消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题

在高并发的情况下,会有大量的消息到达MQ,消费端需要监听大量的消息。这样的情况下,难免会出现消息的重复投递,网络闪断等等。如果不去做幂等,则会出现消息的重复消费。
-消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息,也只会执行一次

看下互联网大厂主流的幂等性操作
唯一ID+指纹吗机制,利用数据库主键去重。
利用Redis的原子实现
-其他的技术实现幂等性

2.2.1 唯一ID+指纹码机制
2.2.2 Redis 原子特性实现

简单使用Redis的自增

3. Confirm 确认消息

理解Confirm 消息确认机制:

蓝色:producer 生产者 红色:MQ Broker 服务器

生产者把消息发送到Broker端,Broker收到消息之后回送给producer。Confirm Listener 监听应答。

操作异步操作,当生产者发送完消息之后,就不需要管了。Confirm Listener 监听MQ Broker的应答。

3.1 如何实现Confirm确认消息?

一步:在channel开启确认模式channel.confirmSelect()
第二步;在chanel添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理

3.2 代码编写

生产者:

public class Producer {
    public static void main(String[] args) throws Exception {

        //1 创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();

        //2 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();

        //3 指定我们的消息投递模式: 消息的确认模式 
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        //4 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        //5 添加一个确认监听  用于发送消息到Broker端之后,回送消息的监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });
    }
}

消费者

public class Consumer {
	
	public static void main(String[] args) throws Exception {		
		
		//1 获取一个连接 
        Connection connection = ConnectionUtils.getConnection();
		
		//2通过Connection创建一个新的Channel
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.#";
		String queueName = "test_confirm_queue";
		
		//3 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
		channel.exchangeDeclare(exchangeName, "topic", true);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//4 创建消费者 
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());			
			System.err.println("消费端: " + msg);
		}
		
	}
}


工具类:

public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);//amqp协议 端口 类似与mysql的3306
        //设置账号信息用户名密码vhost
        factory.setVirtualHost("/vhost_cp");
        factory.setUsername("user_cp");
        factory.setPassword("123456");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

启动消费端=》再启动生产端

3.3 查看管控台:

3.4 打印结果:

可以观察到消费端先接收到消息,之后生产端再接收回调信息。如果出现磁盘已满、RabbitMQ出现异常、queue容量到达上限都可能接收no ack

如果ackno ack消息都未接收到,这就是之前所说的。RabbitMQ出现网络闪断,可以采用上面所说的消息补偿

4. Return消息机制

在基础API中有一个关键的配置项:

4.1 Return消息机制流程

Producer生产端将消息发送到MQ Broker端,但是出现NotFind Exchange,发送的消息的Exchange,在Broker端未能找到。或者找到了,但是路由key路由不到指定的队列。因此是一个错误的消息。
这个时候,生产端应该知道发送的这条消息,并不会被处理。因此MQ Broker提供了这种Return机制,将这些不可达的消息发送给生产端,这时候生产端就需要设置Return Listener去接收这些不可达的消息。然后及时记录日志,去处理这些消息。

4.2 代码演示

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {

        //1 创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        String exchange = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";

        String msg = "Hello RabbitMQ Return Message";


        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties,     
                                     byte[] body) throws IOException {

                System.err.println("---------handle  return----------");
                //响应码
                System.err.println("replyCode: " + replyCode);
                //响应文本
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });

        //第三参数mandatory=true,意味着路由不到的话mq也不会删除消息,false则会自动删除
        channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        //修改routingkey测试是否能够收到消息
        //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    }
}

消费者

public class Consumer {
	
	public static void main(String[] args) throws Exception {		
		
		//1 创建ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_return_exchange";
		String routingKey = "return.#";
		String queueName = "test_return_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.err.println("消费者: " + msg);
		}
	}
}

ConnectionUtils 工具代码在上面。

启动消费端,并查看管控台。

4.3 查看管控台

4.4 查看打印结果

放开消费端代码:channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
消费端打印结果:

可以看到打印结果正常,此时再改代码为:
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

可以看到生产端接收到了不可达的消息。

原文地址:https://blog.csdn.net/vc33569/article/details/134693566

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_37246.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注