一、安装rabbitMQ
-
docker pull rabbitmq:3.8
-
docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name rabbit01 --hostname rabbit01 --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8
-
docker exec -it rabbit01 /bin/bash
-
rabbitmq-plugins enable rabbitmq_management
-
7、或者直接用别人搞好的镜像
docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name rabbit02 --hostname rabbit02 --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management
二、在spring-boot
中整合
-
<!-- spring boot和junit整合单元测试包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbitmq的包引入 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
server.port=8000 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/
三、简单模式
-
2、生产者代码,运行下面的代码,查看可视化界面,并不存在消息,原因是因为需要手动创建
simple_queue
这个队列@SpringBootTest(classes = ProducerApplication.class) public class ProducerTest01 { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test01() { /** * 第一个参数:表示队列的名称 * 第二个参数:表示要发送的数据 */ rabbitTemplate.convertAndSend("simple_queue", "hello world"); } }
-
package com.example.listener01; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ConsumerListener01 { @RabbitListener(queues = "simple_queue") public void listener01(Message message) { String msg = new String(message.getBody()); System.out.println("接收到的消息:" + msg); } }
四、work
工作模式
-
1、简单的来理解,就是在上面简单模式下增加几个消费者,如同搬砖一样的,一个搬运工搬不过来,多叫几个人来干活的性质,避免消息堆积
-
@Test public void test02() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work_queue", "hello world"); } }
-
3、定义2个消费者来一起消费消息
@Component public class ConsumerListener01 { @RabbitListener(queues = "work_queue") public void listener01(Message message) { String msg = new String(message.getBody()); System.out.println("消费者1接收到的消息:" + msg); } }
五、发布模式
- 1、发布模式是指发送一个消息,希望在几个消费者那边都能接收到,上面的工作模式,一条消息被一个消费者消费了,另外一个消费者就接收不到消息,在一些场景需要给每个消费者就要用发布者模式
- 2、根据交换机的模式可以分为以下几种
一、Fanout
模式
-
1、使用配置文件的方式创建交换机和队列,并且将他们绑定在一起
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitFanoutExchangeConfiguration { // 交换机 @Bean public Exchange fanoutExchange() { return ExchangeBuilder.fanoutExchange("fanout_exchange").durable(true).build(); } // 创建一个队列 @Bean public Queue fanoutQueue1() { return QueueBuilder.durable("fanout_queue1").build(); } // 创建一个队列 @Bean public Queue fanoutQueue2() { return QueueBuilder.durable("fanout_queue2").build(); } // 队列和交换机绑定 @Bean public Binding fanoutExchangeQueue01() { // with表示路由key return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()).with("").noargs(); } @Bean public Binding fanoutExchangeQueue02() { // with表示路由key return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()).with("").noargs(); } }
-
2、生产者发送消息
@Test public void test03() { rabbitTemplate.convertAndSend("fanout_exchange", "", "hello world"); }
-
4、定义消费者
@Component public class ConsumerListener01 { @RabbitListener(queues = "fanout_queue1") public void listener01(Message message) { String msg = new String(message.getBody()); System.out.println("消费者1接收到的消息:" + msg); } }
二、Direct
模式
三、Topic
模式
六、直接在监听上使用注解的方式来创建交换机等
-
package com.example.listener04; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ConsumerListener01 { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue_01", durable = "true"), exchange = @Exchange(value = "direct_rabbit_exchange", type = ExchangeTypes.DIRECT), key = {"info", "error"} )) public void listener01(Message message) { String msg = new String(message.getBody()); System.out.println("消费者1接收到的消息:" + msg); } }
-
3、运行后查看
rabbitmq
可视化界面 -
public void test04() { rabbitTemplate.convertAndSend("direct_rabbit_exchange","error","hello world"); }
七、消息丢失
一、开启生产者确认机制
-
server.port=9000 spring.rabbitmq.host=123.56.103.229 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ # 开启生产者确认机制 spring.rabbitmq.publisher-confirm-type=correlated
-
2、重写
RabbitTemplate
,只要我们在容器中有一个RabbitTemplate
,那么spring boot
就不会用对RabbitTemplate
自动化配置package com.example.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfiguration { /** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if (ack) { System.out.println("消息正常投递到交换机中"); } else { System.out.println("消息投递到交换机失败:"+s); } } }); return rabbitTemplate; } }
-
3、发送消息的时候故意写错交换机的名字
@Test public void test04() throws InterruptedException { rabbitTemplate.convertAndSend("direct_rabbit_exchange_xx","error","hello world"); Thread.sleep(2000); }
-
5、在发送消息的时候传递当前唯一的识别
id
,这里使用uuid
@Test public void test04() throws InterruptedException { String msgUuid = UUID.randomUUID().toString().replace("-", ""); CorrelationData correlationData = new CorrelationData(msgUuid); rabbitTemplate.convertAndSend("direct_rabbit_exchange", "error", "hello world", correlationData); Thread.sleep(2000); }
-
@Configuration public class RabbitmqConfiguration { /** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if (ack) { System.out.println("消息正常投递到交换机中"); } else { String mesId = correlationData.getId(); System.out.println(mesId); System.out.println("消息投递到交换机失败:"+s); } } }); return rabbitTemplate; } }
二、交换机到队列的时候出现问题
-
server.port=9000 spring.rabbitmq.host=123.56.103.229 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ # 开启生产者确认机制 spring.rabbitmq.publisher-confirm-type=correlated # 开启生产者回调机制 spring.rabbitmq.publisher-returns=true
-
package com.example.config; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfiguration { /** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); ... // 绑定回退机制的回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println(returnedMessage.getMessage()); System.out.println(returnedMessage.getReplyCode()); System.out.println(returnedMessage.getReplyText()); System.out.println(returnedMessage.getExchange()); System.out.println(returnedMessage.getRoutingKey()); } }); return rabbitTemplate; } }
三、消息持久化
-
1、
rabbitmq
默认是在内存中存储,当服务宕机后数据直接会丢失,消息在spring boot
中持久化是因为框架帮你处理了,修改消息是否持久化可以参考下面@Test public void test04() throws InterruptedException { String msgUuid = UUID.randomUUID().toString().replace("-", ""); CorrelationData correlationData = new CorrelationData(msgUuid); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); // 获取到消息属性对象 messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); // 设置消息不缓存 return message; } }; rabbitTemplate.convertAndSend("direct_rabbit_exchange", "error", "hello world",messagePostProcessor, correlationData); Thread.sleep(2000); }
四、消费者消费消息不丢失
-
2、配置应答模式
server.port=8000 spring.rabbitmq.host=123.56.103.229 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ # 配置应答模式 spring.rabbitmq.listener.simple.acknowledge-mode=auto
八、消费限流
-
1、在消费者端添加配置
server.port=8000 spring.rabbitmq.host=123.56.103.229 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.simple.acknowledge-mode=auto # 每次处理10个 spring.rabbitmq.listener.simple.prefetch=10
九、死信队列
-
1、在下面几种情况下会产生死信队列
-
2、死信队列的架构图
-
3、创建死信队列
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitDlxExchangeConfiguration { // 创建一个死信交换机 @Bean public Exchange dlxExchange() { return ExchangeBuilder.fanoutExchange("dlx_exchange").durable(true).build(); } // 创建一个死信队列 @Bean public Queue dlxQueue() { return QueueBuilder.durable("dlx_queue").maxLength(10).build(); } // 死信交换机和死信队列绑定 @Bean public Binding dlxQueueBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dead").noargs(); } // 创建一个正常的交换机 @Bean public Exchange orderExchange() { return ExchangeBuilder.directExchange("order_exchange").durable(true).build(); } // 创建一个正常队列 @Bean public Queue orderQueue() { return QueueBuilder.durable("order_queue"). maxLength(10). deadLetterExchange("dlx_exchange"). // 死信队列的交换机 deadLetterRoutingKey("dead"). // 死信队列的routingKey build(); } // 正常交换机和正常队列绑定 @Bean public Binding orderQueueBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("info").noargs(); } }
-
4、发送消息
@Test public void test05() throws InterruptedException { for (int i = 0; i < 15; i++) { rabbitTemplate.convertAndSend("order_exchange", "info", "hello world" + i); } Thread.sleep(2000); }
-
5、查看可视化界面,进入死信队列的是时间最早的(也就是最先发送的)
-
6、定义消费者
package com.example.listener05; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ConsumerListener01 { @RabbitListener(queues = "dlx_queue") public void listener01(Message message) { String msg = new String(message.getBody()); System.out.println("接收到的死信队列消息:" + msg); } @RabbitListener(queues = "order_queue") public void listener02(Message message) { String msg = new String(message.getBody()); System.out.println("接收到的订单队列消息:" + msg); } }
十、延迟任务
-
3、延迟任务的结构图
-
4、创建一个延迟任务的队列
@Configuration public class RabbitDlxExchangeConfiguration { ... @Bean public Queue orderQueue() { return QueueBuilder.durable("order_queue"). // maxLength(10). ttl(2000). // 过期时间 deadLetterExchange("dlx_exchange"). // 死信队列的交换机 deadLetterRoutingKey("dead"). // 死信队列的routingKey build(); } }
-
5、发送消息,观察可视化界面,时间到了就会进入到死信队列中
@Test public void test06() throws InterruptedException { rabbitTemplate.convertAndSend("order_exchange", "info", "hello world"); Thread.sleep(2000); }
原文地址:https://blog.csdn.net/kuangshp128/article/details/134676339
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_24900.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!