本文介绍: 从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

概念

计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

RabbitMQ有那些基本概念

1、 Broker:简单来说就是消息队列服务器实体

2、 Exchange:消息交换机,它指定消息按什么规则路由到哪个队列

3、 Queue消息队列载体,每个消息都会被投入到一个或多个队列

4、 Binding绑定,它的作用就是把exchangequeue按照路由规则绑定起来

5、 Routing Key:路由关键字,exchange根据这个关键字进行消息投递

6、 VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queueexchange 和 binding 等,但最最重要的是,其拥有独立的权限系统可以做到 vhost 范围用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

7、 Producer:消息生产者,就是投递消息程序

8、 Consumer:消息消费者,就是接受消息的程序

9、 Channel:消息通道,在客户端每个连接里,可建立多个channel,每个channel代表一个会话任务

由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

在这里插入图片描述

Queue 队列

Queue队列,是RabbitMQ 的内部对象用于存储消息。
RabbitMQ 中消息只能存储在队列中,这一点和Kafka相反。Kafka将消息存储topic主题)这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。

RabbitMQ不支持队列层面的广播消费。

交换器、路由键、绑定

交换器

Exchange :交换器,生产者将消息发送到Exchange,由交换器将消息路由到一个或者多个队列中,如果路由不到,则会返回给生产者或者丢弃消息。交换器

类型

RabbitMQ 中的交换器有四种类型

direct(径直的)

它会把消息路由到那些 BindingKeyRoutingKey 完全匹配的队列中。

fanout(扇出)

它会把所有发送到该交换器的消息路由所有与该路由器绑定的队列中

topic(模糊匹配

它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RouingKey 相匹配的队列中,但这里匹配规则有些不同:

举个栗子

topic 类型的交换器

headers

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容headers 属性进行匹配headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

开发向导

RabbitMQ使用过程

生产者发送消息

  1. 连接 RabbitMQ Broker,建立一个 Connection ,开启 一个信道 Channel
  2. 声明交换器,并设置相关属性;比如交换机类型、是否持久化等;
  3. 声明队列,并设置相关属性;如果是否排他、是否持久化、是否自动删除等;
  4. 通过 RoutingKey 将 Exchange 和 Queue 绑定
  5. 发送消息到 RabbitMQ,其中包括路由键、交换器等信息
  6. 交换器根据路由键查找匹配的队列;
  7. 如果找到,则将从生产者发送过的消息存入队列中;
  8. 如果没有找到,则根据生产者配置属性选择丢弃或者回退给生产者;
  9. 关闭信道;
    10.关闭连接

消费者接收消息

  1. 连接到 RabbitMq Broker,建立一个 Connection,开启一个信道 Channel
  2. 向 RabbitMq Broker 请求消费对应队列的消息,可能会设置想要的回调函数,以及做一些准备工作;
  3. 接收消息;
  4. 消费者确认(ack)接收到的消息;
  5. RabbitMQ 从队列中删除已被确认消费的消息;
  6. 关闭信道;
  7. 关闭连接

使用过程发现,无论生产者或者消费者都需要和 RabbitMQ Broker 建立连接,这个连接(Connection)就是一条 TCP 连接,一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一ID。

信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

Connection 与 Channel

使用交换器和队列

代码清单

channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue(); 
// or 声明队列
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);

exchangeDeclare 方法详解

exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

Exchange.DeclareOk exchangeDeclare(String exchange,
                                   String type,
                                   boolean durable,
                                   boolean autoDelete,
                                   boolean internal,
                                   Map<String, Object> arguments) throws IOException;
参数说明
  1. exchange:交换器名称;
  2. type:交换器类型;如:direct、fanout、topic、headers
  3. durable:是否持久化;持久化可以将交换器存盘,在服务器重启的时候不会丢失相关消息;
  4. autoDelete:是否自动删除自动删除前提:只有有一个队列或交换器与这个交换器绑定了,之后所有与这个交换器绑定的都解绑。(只有绑过到解绑的过程,交换器才会删除,没有绑过则不会删除);
  5. internal:是否内置的;如果是内置交换器,则客户端无法直接发送消息到这个交换器,只能通过交换器路由到交换器这种方式;
  6. argument: 结构化参数;比如: alternate-exchange等;

queueDeclare 方法详解

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                              Map<String, Object> arguments) throws IOException;
参数说明
  1. queue:队列名称;
  2. durable:是否持久化;持久化的队列会存盘,服务器重启的时候可以保证不丢失相关信息
  3. exclusive:是否排他;如果一个队列设置为排他,则该队列仅对首次声明它的连接可见,连接断开自动删除;排他是基于连接(Connection)可见的。其他Connection 不可声明同名排他队列,而且即使排他队列设置了持久化,一旦连接关闭或者客户端退出,排他队列都会自动删除。
  4. autoDelete:是否自动删除;自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开后,才会自动删除;如果没有消费者连接过,则不会删除;
  5. arguments:队列的其他一些参数;如:x-message-ttlx-expiresx-max-lengthx-max-length-bytesx-dead-letter-exchangex-dead-letter-routing-keyx-max-priority等;

注意要点
生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输模式,之后才能声明队列。

queueBind 方法详解

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

参数说明
  1. queue:队列名称;
  2. exchange:交换器名称;
  3. routingKey:用来绑定队列和交换器的路由键;
  4. argument:定义绑定的一些参数;

不仅可以将队列和交换器绑定起来,也可以将已经绑定的关系解除;

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

exchangeBind 方法详解

不仅可以将交换器与队列绑定,也可以将交换器与交换器进行绑定,用法如出一辙。

Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

原文地址:https://blog.csdn.net/chenyao1994/article/details/134540623

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_3618.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注