RabbitMQ 和 PHP 中如何将任务返回到队列中? [英] How in RabbitMQ and PHP return task back to the queue?

查看:100
本文介绍了RabbitMQ 和 PHP 中如何将任务返回到队列中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果处理结果不适合我,我如何将消息返回到队列.仅找到有关消息确认的信息,但我认为它不适合我.我需要,如果作为处理的结果我得到参数 RETRY 消息被添加回队列.然后这个工人或另一个工人又把它捡起来并试图处理它.

How can I return message back to the queue if processing result did not suit me. Found only information about message acknowledgments but I think that it does not suit me. I need that if as a result of processing I get the parameter RETRY message is added back to the queue. And then this worker or another one picks it up again and tries to process it.

例如:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
    $condition = json_decode($msg->body);

    if (!$condition) {
        # return to the queue
    }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

推荐答案

set auto no_ack flag to false

set auto no_ack flag to false

queue:从哪里获取消息的队列
消费者标签:消费者标识符
no_local:不接收此消费者发布的消息.
no_ack:告诉服务器消费者是否会确认消息.
独占:请求独占消费者访问,意味着只有这个消费者可以访问队列
等待:
回调:一个 PHP 回调

queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback

$channel->basic_consume('test', '', false, false, false, false, $callback);

$channel->basic_consume('test', '', false, false, false, false, $callback);

您必须使用确认,如果您的过程不起作用,您可以忽略确认

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($message) {
    $condition = json_decode($message->body);
     
    if (!$condition) {
        // return to the queue 
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }else{
        // send ack , remove from queue
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_consume('test', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

当然用这种方法你会面临消息总是在队列的头部,还有另一种可能性,如果您真的想跟踪重试,可以按照以下方法

Of course with this approach you will face with the message always in the head of the queue, there is also another possibility, if you really want to have a track of retry you can follow the below approach

定义一个重试队列,最好是你的队列名称-retry,最好定义一个死信队列:-dlq

defining a queue for retry, preferably your queue-name -retry and define a dead-letter queue preferably: -dlq

然后您可以执行以下操作:如何设置 -retry 队列:这是其中最重要的部分.您需要声明具有以下功能的队列:

Then you can do something like below: How to set up -retry queue: this is the most important part of it. you need to declare queue with the following features:

x-dead-letter-exchange:应该和你的主队列路由键相同
x-dead-letter-routing-key:应该和你的主队列路由键相同
x-message-ttl:重试之间的延迟

x-dead-letter-exchange: should be same as your main queue routing key
x-dead-letter-routing-key: should be same as your main queue routing key
x-message-ttl: the delay between retries

代码为sudo代码,请勿复制粘贴,仅供参考

the codes are sudo code, please do not copy-paste, this is just a hint to give you the idea about it

$maximumRetry = 5;
$callback = function($message) {
    $body = json_decode($message->body);
    try { 
        // process result is your condition
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } catch(Exception $e) {
        // return to the queue 
        $body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1 
        if ($body['try_attempt'] >= $maximumRetry ){
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
            return
        }
        $msg = new AMQPMessage(json_encode($message));

        $channel->basic_publish($msg, '', 'test-retry');
    }
};

我们需要 3 个队列来重新绑定.

We gonna need 3 queues for retying.

  • queue.example

  • queue.example

  • 绑定:
    • 交换:queue.exchange
    • 路由:queue.example
    • x-dead-letter-exchange:queue.exchange
    • x-dead-letter-routing-key: queue.example-dlq

    queue.example-dlq

    queue.example-dlq

    • 绑定:
      • 交换:queue.exchange
      • 路由:queue.example-dlq

      queue.example-retry

      queue.example-retry

      • 绑定:
        • 交换:queue.exchange
        • 路由:queue.example-retry
        • x-dead-letter-exchange:queue.exchange
        • x-dead-letter-routing-key: queue.example- added
        • x-message-ttl:10000

        这篇关于RabbitMQ 和 PHP 中如何将任务返回到队列中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆