1.1消息从生产者到交换机有可能会丢失。这里可以通过confirm机制来解决
1.2交换机到队列也有可能会丢失。这里可以通过return机制来解决
1.33、从队列到消费者也有可能会丢失。这里可以通过手动ACK解决。
一、为什么要用到RabbitMq?
因为像我们之前的项目,代码之间的执行都是同步的,一个业务的处理必须等待上一个业务的完成,这样就比较耗费时间,比如我们的用户查询数据的时候,对于用户而言他只需要查寻数据这一个操作,对于我们服务端而言可能还需要做一些处理,像存入缓存、删除缓存,只有做完这些操作我们服务端才会把数据传给用户,但是这些是我们业务的处理,不应该让用户来承担这样的一个时间成本,并且用户等待数据的时间过长,给用户也会带来了很不好的体验感,同时模块之前的耦合性很高,一个模块宕机后,全部模块都不能用了。所以要中间件RabbitMQ
二、RabbitMq有什么作用?
1.解耦
就是rabbitMq有一个生产者模块负责发送消息到队列中,一个消费者模块负责从队列中拿到数据进行消费,模块与模块间分离,通过RabbitMq进行数据通信
2.异步
三、RabbitMq的模型
1.helloword模型
2.Work模型
3.发布订阅模型
4.路由键模型
5.主题模型
四、RabbitMq跟SpringBoot的整合
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.yml配置
spring:
rabbitmq:
host: 192.168.107.123 #虚拟主机的ip地址
port: 5672 #RabbitMq的端口号
username: guest #匿名用户
password: guest
virtual-host: / # 虚拟机主机,队列就是保存在虚拟主机中
3.创建队列、创建交换机、将队列与交换机绑定并设置路由键
// 交换机的类型是路由键
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Queue directQueue1() {
return new Queue("direct-queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct-queue2");
}
// 绑定
@Bean
public Binding directQueue1Bind() {
// 给direct-queue1绑定了两个队列
BindingBuilder.DirectExchangeRoutingKeyConfigurer to = BindingBuilder.bind(directQueue1()).to(directExchange());
// 绑定了两个路由键
to.with("error");
Binding warn = to.with("info");
return warn;
}
@Bean
public Binding directQueue2Bind() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
}
4.生产者发送消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("direct-exchange", "error", "toString");
System.out.println("生产者消息发送完成");
}
5.消费者消费消息
@Component
public class HelloQueueListener {
@RabbitListener(queues = "direct-queue1")
public void cosnuermMsg(String msg) {
System.out.println("消费者拿到的数是:" + msg);
}
}
五、ACK机制
ACK机制是消费端的一个消息确认机制
1.什么是消息确认机制?
MQServer把消息推送给消费者后,消费者开始消费,消费完成后需要把结果给MQServer应答一下,消费结果有两种情况:失败、成功
消费成功:应答ACK,MQServer手动ACK后就明白这个消息已经被成功的消费了,可以从队列中删除了。
消费失败:应答NACK。MQServer收到NACK后知道了消费者无法消费这个消息,发送给其他的消费者进行消费。如果其他的消费者也是无法消费,此时需要这类消息全部的收集起来入库,通知相关人员来检查。
消费者默认自动应答,不出异常自动应答,出了异常应答NACK,并且把这个消息压入到队列
ready:待分配(消费者)的消息的数量。
unackded:待应答的消息数量。
total:总消息的数量。
当出现异常没有处理时候,那么被认为应答nACK,消息回到队列的待应答状态,关掉消费者,则进入待分配状态
2.手动开启ACK
1.ylm配置文件
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动ACK
2.消费者应答
@RabbitListener(queues = "hello-queue")
public void cosnuermMsg(String msg, Channel channel, Message message) {
System.out.println("消费者拿到的数是:" + msg);
// 每个消息的标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 开始消费消息
Boolean flag = customerData(msg);
if (flag) {
// 确定消息消费成功,应答ACK
// 第一个参数:消息的唯一标识
// 第二个参数:是否批量应答,一般都是false
channel.basicAck(deliveryTag, false);
System.out.println("消息成功,应答ACK");
return;
}
System.out.println("消息过程中没有出现异常,但是消息没有消费成功,应答NACK");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消费过程中出现了异常,应答NACK");
}
// 消息消费失败,应答NACK
// 第三个参数是:是否压入队列,如果设置为false该消息就丢弃了
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
// 完成消息的消费
private Boolean customerData(String msg) {
Integer i = Integer.parseInt(msg);
if (i == 0) { // 没有插入成功,但是也没有出现异常
return false;
}
return true;
}
六、消息的可靠性
1.消息可靠性讲的不能丢失,MQ是如何保证消息可靠性的?
1.1消息从生产者到交换机有可能会丢失。这里可以通过confirm机制来解决
1.2交换机到队列也有可能会丢失。这里可以通过return机制来解决
reutrn机制是RabbitMQ自己提供的一个机制,用来确认消息是否到了队列了。
1.33、从队列到消费者也有可能会丢失。这里可以通过手动ACK解决。
手动ACK后就明白这个消息已经被成功的消费了,可以从队列中删除了
2.confirm和return机制的实现
2.1yml配置
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true #开启return
publisher-confirm-type: simple # 开启confirm
/**
* confirm机制和return机制
*/
@Component
public class MsgConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//这里要对mq的return和confirm进行覆盖
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
// confirm机制确认消息是否到了交换机
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已经到了交换机");
} else {
System.out.println("消息没到了交换机");
}
}
// 确认消息是否到了队列
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有到队列," + replyText + "," + exchange + "," + routingKey);
}
3.MQ是如何实现消息确认机制的?
生产者发送消息的时候除了交换机,数据,路由键把回调函数也发送过去了,因为消息是否到了交换机时MQServer确定的,如果到了就调用ACK
4.消息补偿机制
但是这只是通知消息是否正常到达,并没有对消息没有达到的情况进行一个处理,所以我们要进行消息补偿,生产者对未发送成功的消息进行一个消息补偿,特别是对业务很重要的数据必须补偿,无关紧要的倒可以不必因为消息补偿需要成本
也就是mq发送数据出现故障的时候,就可以考虑别的方式发送数据了,这就是消息补偿,如:RPC远程调用直接由生产者发送给消费者,不通过mq
5.服务端实现远程调用(RPC)
5.1通过java网络编程包
5.2RestTemplate
@Test
void contextLoads2() throws Exception {
// 这个类可以发送一个请求过去
RestTemplate restTemplate = new RestTemplate();
String info = restTemplate.getForObject("http://localhost:8080/send?msg=HTTP", String.class);
System.out.println("远程调用的返回的结果:"+info);
}
5.3ApacheHttpClient:通过工具类即可
七、消息的重复消费
消息的重复消费指的是一个消息被同一个消费者消费了多次。
1.消息被消费多次的后果
消费者拿到消息后干的事情是扣款或者是发送短信等待。
2.怎么解决消息被重复消费
3.为什么消息会被重复消费
消费者拿到数据开始消费,并且也消费成功了,在做ACK应答之前网络出现了闪断,消费者和MQServer断开了连接。MQServer中的待应答就会变成待分配,此时消息已经成功消费了,因为是闪断,所以又再次的连接成功,MQServer在再次的把消息推送给了消费者,消费者再次拿到数据再次进行消费,这里就出现了重复的消费。
4.幂等性解决方案
Token机制:关于解决表单的重复提交就是服务端生成一个token带给表单,表单中隐藏这个token,多次提交携带token过去,第一次token有效进行操作,然后把token设为失效了,然后后面的提交携带的token就是失效的了,就不会操作了
CAS保证接口幂等性
乐观锁实现幂等性
防重表
缓存队列
八、死信队列
1.什么是死信?
非正常的消息就是死信
2.什么是非正常的消息
3.什么是死信队列
保存死信的消息的队列就是死信队列
一般无法处理的消息都是死信,会把这些死信消息全部的转到同一个队列来做特殊的处理,这个队列就是死信队列。
九、延迟队列
原文地址:https://blog.csdn.net/qq_66013742/article/details/134428056
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_7527.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!