本文介绍: 1、发布订阅模式同样是一个生产者生产消息多个消费者来消费,与上面的工作模式区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者可以消费到消息。1、工作模式是指一个生产者多个消费者,在简单模式扩展多个消费者,每个消费者只能交替来消费消息。4、生产者需要消息发送交换机里面可以,交换机会根据绑定队列推送消息。1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加路由。3、消费者只需要绑定队列来消费消息可以

一、安装rabbitmq

二、go语言rabbitmq基本操作

三、简单模式

四、工作模式

五、发布订阅模式

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)
    	// 2.创建一个交换
    	err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 3.队列和交换绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、定义生产者

    func main() {
        // 消费者会根据绑定的路由key来获取消息
    	_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    

七、主题模式

八、简单对其封装

  • 1、封装代码

    package utils
    
    import (
    	"errors"
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // MQURL url格式 amqp://账号:密码@rabbitmq服务地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"
    
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	MQUrl   string
    }
    
    // NewRabbitMQ 创建RabbitMQ的结构实例
    func NewRabbitMQ() *RabbitMQ {
    	rabbitMQ := &RabbitMQ{
    		MQUrl: MQURL,
    	}
    	var err error
    	// 创建rabbitMQ连接
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	if err != nil {
    		rabbitMQ.failOnErr(err, "创建连接错误")
    	}
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		rabbitMQ.failOnErr(err, "获取channel失败")
    	}
    	return rabbitMQ
    }
    
    // Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {
    	// 1.创建1个队列
    	queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)
    	if err != nil {
    		r.failOnErr(err, "创建队列失败")
    	}
    	if exchange != "" && key == "" {
    		r.failOnErr(errors.New("错误"), "请传递交换机链接类型")
    	}
    	if exchange != "" {
    		// 2.创建一个交换机
    		err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)
    		if err1 != nil {
    			r.failOnErr(err, "创建交换机失败")
    		}
    		// 3.队列和交换机绑定在一起
    		if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {
    			fmt.Println("1111")
    			r.failOnErr(err, "交换机和队列绑定失败")
    		}
    	}
    	fmt.Println("创建成功")
    }
    
    // failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		log.Fatalf("%s:%s", message, err)
    		panic(fmt.Sprintf("%s:%s", message, err))
    	}
    }
    
    func (r *RabbitMQ) Close() {
    	defer func(Conn *amqp.Connection) {
    		err := Conn.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭链接失败")
    		}
    	}(r.conn)
    	defer func(Channel *amqp.Channel) {
    		err := Channel.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭通道失败")
    		}
    	}(r.channel)
    }
    
    func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {
    	err := r.channel.Qos(prefetchCount, prefetchSize, global)
    	if err != nil {
    		r.failOnErr(err, "限流失败")
    	}
    }
    
    // Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {
    	// 2.发送数据到队列中
    	if err := r.channel.Publish(
    		exchange,
    		routerKey,
    		false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者
    		false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者
    		amqp.Publishing{
    			Body: []byte(message),
    		},
    	); err != nil {
    		r.failOnErr(err, "发送消息失败")
    	}
    	fmt.Println("恭喜你,消息发送成功")
    }
    
    // Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {
    	// 2.接收消息
    	message, err := r.channel.Consume(
    		queueName,
    		"",    // 区分多个消费者
    		true,  // 是否自动应答
    		false, // 是否具有排他性
    		false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递connection中的消费者
    		false, // 队列消费是否阻塞
    		nil,
    	)
    	if err != nil {
    		r.failOnErr(err, "接收消息失败")
    	}
    	fmt.Println("消费者等待消费...")
    	forever := make(chan bool)
    	// 使用协程处理消息
    	go func() {
    		for d := range message {
    			log.Printf("接收到的消息:%s", d.Body)
    			callback(d.Body)
    		}
    	}()
    	<-forever
    }
    
  • 2、简单模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("simple_queue1", func(message []byte) {
    		fmt.Println(string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("simple_queue1", "", "", "")
    	defer mq.Close()
    	mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("work_queue1", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	for i := 0; i < 10; i++ {
    		mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))
    	}
    }
    
  • 4、交换机带路由的时候

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")
    	mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("first_queue2", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

原文地址:https://blog.csdn.net/kuangshp128/article/details/134624620

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_14901.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注