消费未确认的RabbitMq消息 [英] Consuming not acknowledge messages from RabbitMq

查看:701
本文介绍了消费未确认的RabbitMq消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个简单的发布者和一个使用basic.consume在队列上进行订阅的使用者.

I have create a simple publisher and a consumer which subscribes on the queue using basic.consume.

我的使用者在作业正常运行时会确认消息.每当遇到异常时,我都不会确认该消息并提早返回.仅已确认的消息从队列中消失,因此可以正常工作.
现在,我希望使用者重新获取失败的消息,但是重新使用这些消息的唯一方法是重新启动使用者.

My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.

我该如何处理这个用例?

How do I need to approach this use case?

设置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

生产者代码

$exchange->publish('message');

推荐答案

如果未确认消息且应用程序失败,它将自动重新发送,并且信封上的redelivered属性将设置为true(除非您使用它们)带有no-ack = true标志).

If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

UPD:

您必须nack在catch块中带有重新交付标志的消息

You have to nack message with redelivery flag in your catch block

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

提防无穷无尽的邮件,而重新交付计数在RabbitMQ和AMQP协议中根本没有实现.

Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

如果您不想弄乱此类消息,而只是想增加一些延迟,则可能需要在nack方法调用之前添加一些sleep()usleep(),但这根本不是一个好主意.

If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

有多种技术可以解决周期重新交付问题:

There are multiple techniques to deal with cycle redeliver problem:

1.依靠死信交换

1. Rely on Dead Letter Exchanges

  • 优点:可靠,标准,清晰
  • 缺点:需要其他逻辑

2.每个邮件或每个队列TTL使用

2. Use per message or per queue TTL

  • 优点:易于实施,也很标准,清晰
  • 缺点:如果队列较长,您可能会丢失一些消息

示例(请注意,对于队列ttl,我们仅传递数字,对于消息ttl,则仅传递数字字符串):

Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

2.1每封邮件ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2.每个队列ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3.在邮件正文或标头中保留重发次数或剩余的重发次数(即IP堆栈中的aka跳数限制或ttl)

  • 优点:在应用程序级别
  • 上,您可以进一步控制消息的生存时间
  • 缺点:您必须修改消息并再次发布消息(特定于应用程序,不清楚)时开销很大
  • pros: give you extra control on messages lifetime on application level
  • cons: significant overhead while you have to modify message and publish it again, application specific, not clear

代码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

可能还有其他一些方法可以更好地控制消息重新传送流.

There are may be some other ways to better control message redelivers flow.

结论:没有解决方案.您必须决定哪种解决方案最适合您的需求,或者找出其他解决方案,但不要忘了在这里分享它;)

Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

这篇关于消费未确认的RabbitMq消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆