如何在rabbitmq、pika python中优雅地暂停和恢复消费 [英] How to pause and resume consumption gracefully in rabbitmq, pika python

查看:406
本文介绍了如何在rabbitmq、pika python中优雅地暂停和恢复消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 basic_consume() 接收消息,使用 basic_cancel 取消消费,但有一个问题.

I'm using basic_consume() for receiving messages and basic_cancel for canceling consuming, but there is a problem.

这里是pika.channel的代码

Here is the code of pika.channel

 def basic_consume(self, consumer_callback, queue='', no_ack=False,
                      exclusive=False, consumer_tag=None):
        """Sends the AMQP command Basic.Consume to the broker and binds messages
        for the consumer_tag to the consumer callback. If you do not pass in
        a consumer_tag, one will be automatically generated for you. Returns
        the consumer tag.

        For more information on basic_consume, see:
        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume

        :param method consumer_callback: The method to callback when consuming
        :param queue: The queue to consume from
        :type queue: str or unicode
        :param bool no_ack: Tell the broker to not expect a response
        :param bool exclusive: Don't allow other consumers on the queue
        :param consumer_tag: Specify your own consumer tag
        :type consumer_tag: str or unicode
        :rtype: str

        """
        self._validate_channel_and_callback(consumer_callback)

        # If a consumer tag was not passed, create one
        consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number,
                                                      uuid.uuid4().get_hex())

        if consumer_tag in self._consumers or consumer_tag in self._cancelled:
            raise exceptions.DuplicateConsumerTag(consumer_tag)

        self._consumers[consumer_tag] = consumer_callback
        self._pending[consumer_tag] = list()
        self._rpc(spec.Basic.Consume(queue=queue,
                                     consumer_tag=consumer_tag,
                                     no_ack=no_ack,
                                     exclusive=exclusive),
                           self._on_eventok,
                           [(spec.Basic.ConsumeOk,
                             {'consumer_tag': consumer_tag})])

        return consumer_tag

def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
        """This method cancels a consumer. This does not affect already
        delivered messages, but it does mean the server will not send any more
        messages for that consumer. The client may receive an arbitrary number
        of messages in between sending the cancel method and receiving the
        cancel-ok reply. It may also be sent from the server to the client in
        the event of the consumer being unexpectedly cancelled (i.e. cancelled
        for any reason other than the server receiving the corresponding
        basic.cancel from the client). This allows clients to be notified of
        the loss of consumers due to events such as queue deletion.

        :param method callback: Method to call for a Basic.CancelOk response
        :param str consumer_tag: Identifier for the consumer
        :param bool nowait: Do not expect a Basic.CancelOk response
        :raises: ValueError

        """
        self._validate_channel_and_callback(callback)
        if consumer_tag not in self.consumer_tags:
            return
        if callback:
            if nowait is True:
                raise ValueError('Can not pass a callback if nowait is True')
            self.callbacks.add(self.channel_number,
                               spec.Basic.CancelOk,
                               callback)
        self._cancelled.append(consumer_tag)
        self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag,
                                    nowait=nowait),
                  self._on_cancelok,
                  [(spec.Basic.CancelOk,
                    {'consumer_tag': consumer_tag})] if nowait is False else [])

正如你所看到的,每次我取消消费时,consumer_tag 都会被添加到 _canceled 列表中.如果我再次在 basic_consume 中使用这个标签,将会引发duplicateConsumer 异常.好吧,我每次都可以使用新的 consumer_tag,但实际上我不是.因为迟早生成的标签会与之前的一些标签完全匹配.

As you can see every time I'm cancelling consumption consumer_tag is added to _canceled list. And if I would use this tag in basic_consume again the duplicateConsumer exception will be raised. Well, I could use a new consumer_tag every time, but in fact I'm not. Because sooner or later generated tag would be exactly match some of the previous ones.

我应该如何在鼠兔中优雅地暂停和恢复消费?

How should I pause and resume consumption gracefully in pika?

推荐答案

看起来 Pika 做得比它应该做的更多 - 如果没有提供消费者标签(服务器会),它不需要创建消费者标签并且它也不需要注意重复的消费者标签(服务器支持使用相同的标签恢复).

That looks like Pika is doing more than it should - it doesn't need to create a consumer tag if one is not supplied (the server will) and it also doesn't need to watch for duplicated consumer tags (resuming with the same tag is supported by the server).

所以我不知道如何用 Pika 做到这一点 - 我想提交一个错误.

So I'm not sure how to do this with Pika - file a bug I suppose.

这篇关于如何在rabbitmq、pika python中优雅地暂停和恢复消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆