本文介绍: 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息。1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息。4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息。1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由。3、消费者只需要绑定队列来消费消息就可以。
一、安装rabbitmq
-
docker pull rabbitmq:3.8
-
docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name rabbit01 --hostname rabbit01 --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8
-
docker exec -it rabbit01 /bin/bash
-
rabbitmq-plugins enable rabbitmq_management
-
7、或者直接用别人搞好的镜像
docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name rabbit02 --hostname rabbit02 --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management
二、go
语言对rabbitmq
基本操作
-
go get github.com/streadway/amqp
-
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { // 连接rabbitmq // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称") // 端口号:5672 conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672 defer conn.Close() // 打开通道 ch, err := conn.Channel() fmt.Println(err) defer ch.Close() }
-
package utils import ( "fmt" "github.com/streadway/amqp" ) func RabbitmqUtils() *amqp.Channel { // 连接rabbitmq conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672 //defer conn.Close() // 打开通道 ch, err := conn.Channel() fmt.Println(err) //defer ch.Close() return ch }
-
func main() { // 创建一个队列 // durable, autoDelete, exclusive, noWait bool queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil) fmt.Println(queue.Name, err) }
三、简单模式
-
func main() { /** 第一个参数是交换机名称 第二个参数是队列名称 第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃 第四个参数是 路由的时候 第五个参数是消息体 */ err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{ Body: []byte("hello word"), }) fmt.Println(err) }
-
/** 第一个参数是队列名称 第二个参数自己给当前消费者命名 第三个参数是否自动应答 第三个参数队列是否可以被其他队列访问 第四个参数 第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度 */ msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil) fmt.Println(err) for msg := range msgChan { fmt.Println(string(msg.Body)) }
四、工作模式
-
2、定义2个消费者来消费消息
func main() { msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
3、生产多条消息
func main() { for i := 0; i < 10; i++ { _ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("hello word %d", i)), }) } }
-
4、消费结果
五、发布订阅模式
-
1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息
-
func main() { // 1.创建2个队列 queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil) queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil) // 2.创建一个交换机 _ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil) // 3.队列和交换机绑定在一起 _ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil) }
-
func main() { msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息
func main() { _ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{ Body: []byte("hello word"), }) }
六、路由模式
-
func main() { // 1.创建2个队列 queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil) queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil) // 2.创建一个交换机 err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil) if err != nil { fmt.Println(err) } // 3.队列和交换机绑定在一起 _ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil) }
-
3、定义消费者
func main() { msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
4、定义生产者
func main() { // 消费者会根据绑定的路由key来获取消息 _ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{ Body: []byte("hello word"), }) }
七、主题模式
八、简单对其封装
-
package utils import ( "errors" "fmt" "github.com/streadway/amqp" "log" ) // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost const MQURL = "amqp://admin:123456@localhost:5672//" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel MQUrl string } // NewRabbitMQ 创建RabbitMQ的结构体实例 func NewRabbitMQ() *RabbitMQ { rabbitMQ := &RabbitMQ{ MQUrl: MQURL, } var err error // 创建rabbitMQ连接 rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) if err != nil { rabbitMQ.failOnErr(err, "创建连接错误") } rabbitMQ.channel, err = rabbitMQ.conn.Channel() if err != nil { rabbitMQ.failOnErr(err, "获取channel失败") } return rabbitMQ } // Binding 创建交换机和队列并且绑定在一起 func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) { // 1.创建1个队列 queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil) if err != nil { r.failOnErr(err, "创建队列失败") } if exchange != "" && key == "" { r.failOnErr(errors.New("错误"), "请传递交换机链接类型") } if exchange != "" { // 2.创建一个交换机 err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil) if err1 != nil { r.failOnErr(err, "创建交换机失败") } // 3.队列和交换机绑定在一起 if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil { fmt.Println("1111") r.failOnErr(err, "交换机和队列绑定失败") } } fmt.Println("创建成功") } // failOnErr 定义内部错误处理 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } func (r *RabbitMQ) Close() { defer func(Conn *amqp.Connection) { err := Conn.Close() if err != nil { r.failOnErr(err, "关闭链接失败") } }(r.conn) defer func(Channel *amqp.Channel) { err := Channel.Close() if err != nil { r.failOnErr(err, "关闭通道失败") } }(r.channel) } func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) { err := r.channel.Qos(prefetchCount, prefetchSize, global) if err != nil { r.failOnErr(err, "限流失败") } } // Publish 发布者 func (r *RabbitMQ) Publish(exchange, routerKey, message string) { // 2.发送数据到队列中 if err := r.channel.Publish( exchange, routerKey, false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者 false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者 amqp.Publishing{ Body: []byte(message), }, ); err != nil { r.failOnErr(err, "发送消息失败") } fmt.Println("恭喜你,消息发送成功") } // Consumer 消费者 func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) { // 2.接收消息 message, err := r.channel.Consume( queueName, "", // 区分多个消费者 true, // 是否自动应答 false, // 是否具有排他性 false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者 false, // 队列消费是否阻塞 nil, ) if err != nil { r.failOnErr(err, "接收消息失败") } fmt.Println("消费者等待消费...") forever := make(chan bool) // 使用协程处理消息 go func() { for d := range message { log.Printf("接收到的消息:%s", d.Body) callback(d.Body) } }() <-forever }
-
2、简单模式的使用
func main() { mq := utils.NewRabbitMQ() mq.Consumer("simple_queue1", func(message []byte) { fmt.Println(string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() mq.Binding("simple_queue1", "", "", "") defer mq.Close() mq.Publish("", "simple_queue1", "你好水痕") }
-
3、工作模式的使用
func main() { mq := utils.NewRabbitMQ() mq.Consumer("work_queue1", func(message []byte) { fmt.Println("消费者2", string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() defer mq.Close() for i := 0; i < 10; i++ { mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i)) } }
-
4、交换机带路由的时候
func main() { mq := utils.NewRabbitMQ() mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info") mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error") mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info") defer mq.Close() } func main() { mq := utils.NewRabbitMQ() mq.Consumer("first_queue2", func(message []byte) { fmt.Println("消费者2", string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() defer mq.Close() mq.Publish("first_exchange1", "error", "你好水痕") }
原文地址:https://blog.csdn.net/kuangshp128/article/details/134624620
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_14901.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。