使用来自 RabbitMq 的不确认消息 [英] Consuming not acknowledge messages from RabbitMq

查看:57
本文介绍了使用来自 RabbitMq 的不确认消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个简单的发布者和一个使用 basic.consume 订阅队列的消费者.

I have create a simple publisher and a consumer which subscribes on the queue using basic.consume.

我的消费者在作业无异常运行时确认消息.每当我遇到异常时,我都不会确认消息并提前返回.只有已确认的消息会从队列中消失,因此可以正常工作.
现在我希望消费者再次获取失败的消息,但重新使用这些消息的唯一方法是重新启动消费者.

My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.

我需要如何处理这个用例?

How do I need to approach this use case?

设置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

生产者代码

$exchange->publish('message');

推荐答案

如果消息没有被确认并且应用程序失败,它会自动重新发送,并且信封上的 redelivered 属性将设置为 true(除非您使用 no-ack = true 标志使用它们).

If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

UPD:

您必须在 catch 块中nack 带有重新传递标志的消息

You have to nack message with redelivery flag in your catch block

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

当心无限接收的消息,而在 RabbitMQ 和 AMQP 协议中根本没有实现重新传递计数.

Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

如果您不想弄乱这些消息而只想添加一些延迟,您可能需要在 sleep()usleep() 之前添加一些 sleep()code>nack 方法调用,但这根本不是一个好主意.

If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

有多种技术可以处理周期重新交付问题:

There are multiple techniques to deal with cycle redeliver problem:

1.依靠死信交换

  • 优点:可靠、标准、清晰
  • 缺点:需要额外的逻辑

2.使用每个消息或每个队列的TTL

  • 优点:易于实施、标准、清晰
  • 缺点:排长队可能会丢失一些信息

示例(注意,对于队列 ttl,我们只传递数字,对于消息 ttl - 任何将是数字字符串的内容):

Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

2.1 每条消息 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2.每个队列 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3.在邮件正文或标题中保留重新发送计数或保留重新发送数(又名跳数限制或 IP 堆栈中的 ttl)

  • 优点:让您在应用程序级别
  • 对消息生命周期进行额外控制
  • 缺点:大量开销,而您必须修改消息并再次发布它,特定于应用程序,不清楚

代码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

可能还有其他一些方法可以更好地控制消息重新传送流.

There are may be some other ways to better control message redelivers flow.

结论:没有灵丹妙药.您必须决定哪种解决方案最适合您的需求或找出其他解决方案,但不要忘记在此处分享;)

Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

这篇关于使用来自 RabbitMq 的不确认消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆