本文介绍: 代码参数说明在 第一小节的代码中,如果需要可移步到第一节查看

代码参数说明在 第一小节的代码中,如果需要可移步到第一节查看

工作队列

在这里插入图片描述
工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源时间操作。当我们任务(Task)当作消息发送队列中,一个运行后台的工作者worker进程就会取出任务然后处理。当你运行多个作者workers),任务就会在它们之间共享
使用工作队列一个好处就是它能够并行的处理队列。如果堆积了很多任务我们需要添加更多的工作者workers)就可以了,扩展简单
当我们在 n个Terminal 窗口中,运行消费者程序,就可以多个消费者处理生产者生产的消息了 当队列中的消息发送消费者1的时候,就不会再发送给消费者2了。

消息确认

当我们处理消息的时候, 我们想知道,消费者在处理的过程是否已经处理完成,没有出现消费者挂机的状态这里需要消息确认了, 不然队列中的消息是否处理完成,不能明确, 有可能会丢失重要的数据
消息响应默认开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候设置的第四个参数basic_consume为false (true 意味着不响应ack) ,当工作者worker)完成了任务,就发送一个响应

公平调度

RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。 不会等侍是否处理完成
我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个作者worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

$channel->basic_qos(null, 1, null);

生产者

使用了 第一小节中的生产者一次生成了20个消息

消费者

<?php
declare (strict_types = 1);

namespace appcommand;

use Exception;
use PhpAmqpLibConnectionAMQPStreamConnection;
use thinkconsoleCommand;
use thinkconsoleInput;
use thinkconsoleinputArgument;
use thinkconsoleinputOption;
use thinkconsoleOutput;

//工作队列带消息确认的 开几个终端,就会有几个消息者来消费。
//生产者可以使用 SimpleMQProduce 中的生产者
class WorkerWithAck extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('workerwithack')
            ->setDescription('这是一个工作队列,带应答的');
    }

    protected function execute(Input $input, Output $output)
    {
        //获取连接
        $connection = $this->getConnection();
        //获取通道
        $channel = $connection->channel();
        $channel->queue_declare("hello",false,false,false,false,false);
        $callback = function($msg){
          $msgbody = $msg->body;
          $msgbydyArr = json_decode($msgbody,true);
          echo $msgbydyArr["name"]."--".$msgbydyArr["age"]."--".$msgbydyArr["sex"].PHP_EOL;
          $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了
        };

        //如果我们要让消费者一条一条的处理消费,也就是说 只有consumer已经处理并确认了上一条message时queue才分派新的message给它
        //我们可以加上下面的这个代码, 注意,是可以加,也可以不加。因为我们有了应答机制,消息是不会丢失
        //$channel->basic_qos(null,1,null);  //这句可加可不加

        $channel->basic_consume("hello","",false,false,false,false,$callback);

        while(count($channel->callbacks)){
            $channel->wait();
        }

    }

    protected function getConnection(){
        try{
            return new AMQPStreamConnection("192.168.3.228",5672,"admin","123456");
        }catch(Exception $e){
            throw new Exception("创建队列连接失败");
        }
    }

}

测试结果两个 工作队列分别处理了,同一个生产者数据,并且没有重复
在这里插入图片描述
在这里插入图片描述

原文地址:https://blog.csdn.net/hjh15827475896/article/details/134575841

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_14069.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注