本文介绍: 直连(路由)交换机,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载.
1.Direct
直连(路由)交换机,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。
通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载.
如上图中的routingKey为error绑定队列disk,routingKey为info或warning绑定队列console。
2.生产者
package com.hong.rabbitmq7;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* @Description: 直连模式消息发送者
* @Author: hong
* @Date: 2024-01-15 22:24
* @Version: 1.0
**/
public class DirectSend {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String,String> map = new HashMap<>();
map.put("info","我是info信息");
map.put("debug","我是debug信息");
map.put("warning","我是warning信息");
map.put("error","我是error信息");
for(Map.Entry<String,String> bindingKeys : map.entrySet()){
String bindingKey = bindingKeys.getKey();
String message = bindingKeys.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完成------" + message);
}
}
}
3.消费者1-disk只接受error消息
package com.hong.rabbitmq7;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
* @Description: 直连模式消息接收者1-disk接收error消息
* @Author: hong
* @Date: 2024-01-15 20:22
* @Version: 1.0
**/
public class Receiver1 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
/*
*声明交换机
*第1个参数:交换机名称
*第2个参数:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
String queueName = "disk";
channel.queueDeclare(queueName,false,false,false,null);
/*
* 绑定队列与交换机
* 第1个参数:队列名称
* 第2个参数:交换机名称
* 第3个参数:routingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"error");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println("disk中的:"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
4.消费2-console接收info和warning消息
package com.hong.rabbitmq7;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
* @Description: 直连模式消息接收者2-console接收info和warning消息
* @Author: hong
* @Date: 2024-01-15 20:22
* @Version: 1.0
**/
public class Receiver2 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
/*
*声明交换机
*第1个参数:交换机名称
*第2个参数:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
String queueName = "console";
channel.queueDeclare(queueName,false,false,false,null);
/*
* 绑定队列与交换机
* 第1个参数:队列名称
* 第2个参数:交换机名称
* 第3个参数:routingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println("console中的:"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
原文地址:https://blog.csdn.net/qq_41596346/article/details/135610043
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_59260.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。