MQ高级特性
1.削峰
拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费
TTL
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
1.图形化操作
添加队列
添加交换机
2.代码实现
@Configuration public class TopicMqTtlConfig { @Value("${mq.exchange.name}") private String EXCHANGENAME; @Value("${mq.queue.name1}") private String QUEUENAME1; @Value("${mq.queue.name2}") private String QUEUENAME2; // 1 // . 交换机 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 2。 队列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//过期时间30秒 .withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃 .build(); return queue; } @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .withArgument("x-message-ttl",300000000)//过期时间30秒 .build(); return queue2; } // 3. 交换机和队列进行绑定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs(); return binding1; } @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs(); return binding2; } }
添加成功 ttl1只接收10条
时间过期
死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)
给队列设置参数: x-dead–letter–exchange 和 x-dead–letter-routing-key
// 1. 交换机 :正常的交换机 死信交换机
// 2.队列 :正常的 死信
//3.绑定 正常ex – 正常的que
正常的que和死信交换机
死信ex-死信queue
2.代码实现
@Configuration public class TopicMqDeadConfig { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; @Value("${mq1.exchange.name2}") private String DEADEXCHANGE; @Value("${mq1.queue.name1}") private String QUEUENAME1; @Value("${mq1.queue.name2}") private String QUEUENAME2; // 声明正常交换机 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 正常队列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//过期时间30秒 .withArgument("x-dead-letter-exchange",DEADEXCHANGE) .withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定 //.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃 .build(); return queue; } // 交换机和队列进行绑定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs(); return binding1; } // 声明死信交换机 @Bean("ex2") public Exchange getDeadExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build(); return exchange; } //死信队列 @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .build(); return queue2; } // 死信交换机和死信队列进行绑定 @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs(); return binding2; } }
监听正常队列
总结:
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
2. 消费者拒接消费消息,并且不重回队列;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1. 定时器
2. 死信队列
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列
组合实现延迟队列的效果。
1.配置
添加依赖
<!--2. rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--nacos 配置中心--> <!--配置中心--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- application bootstrap --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> <!-- nacos--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.example</groupId> <artifactId>sys-comm</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
修改配置
2.代码实现
发送消息 测试
过期后放入死信队列
添加依赖
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.16</version> </dependency>
获取成功
3.连接数据库
创建表
创建测试类
@RestController @RequestMapping("order") public class OrderController { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; // @Resource private RabbitTemplate rabbitTemplate; @GetMapping public Result aaa(TabOrder order){ //1. 消息 存放到mq里面 String s = JSONUtil.toJsonStr(order); // openfeign -- 数据添加到数据库里面 rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s); return Result.success(s); } }
监听normal
import javax.annotation.Resource; @Component public class XiaoFeng implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_normal") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 将字符串转化为 对象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // 将订单的信息 报讯到数据库里面 int insert = orderMapper.insert(order); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒绝签收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
监听dead
@Component public class YanChi implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_dead") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 将字符串转化为 对象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // order 的状态 TabOrder tabOrder = orderMapper.selectById(order.getId()); if(tabOrder.getStatus()==1){ // 取消 tabOrder.setStatus(3); } orderMapper.updateById(tabOrder); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒绝签收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
成功
原文地址:https://blog.csdn.net/wql56789/article/details/134676807
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_28276.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!