第十章 RabbitMQ
一、初识 MQ
1. 同步和异步通讯
1.1 同步通讯
1.2 异步通讯
2. 技术对比
2.1 比较常见的 MQ 实现
2.2 几种常见 MQ 的对比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
二、快速入门
1. 安装 RabbitMQ
1.1 单机部署
1.1.1 下载镜像
docker pull rabbitmq:3-management
docker load -i mq.tar
1.1.2 安装 MQ
docker run
-e RABBITMQ_DEFAULT_USER=alex
-e RABBITMQ_DEFAULT_PASS=123321
--name mq
--hostname mq1
-p 15672:15672
-p 5672:5672
-d
rabbitmq:3-management
1.2 集群部署
集群分类
1.3 MQ 的基本结构
2. RabbitMQ 消息模型
3. 导入 Demo 工程
4. 入门案例
4.1 publisher 实现
package com.alex.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("alex");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
4.2 consumer 实现
package com.alex.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("alex");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
5. 总结
原文地址:https://blog.csdn.net/sgsgkxkx/article/details/134173661
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_41510.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。