Python 和 RabbitMQ - 侦听来自多个渠道的消费事件的最佳方式? [英] Python and RabbitMQ - Best way to listen to consume events from multiple channels?

查看:29
本文介绍了Python 和 RabbitMQ - 侦听来自多个渠道的消费事件的最佳方式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个独立的 RabbitMQ 实例.我正在努力寻找听取双方事件的最佳方式.

I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

例如,我可以使用以下方式使用事件:

For example, I can consume events on one with the following:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

我有第二位主持人host2",我也想听听.我想过创建两个单独的线程来执行此操作,但从我读过的内容来看,pika 不是线程安全的.有没有更好的办法?还是创建两个单独的线程,每个线程监听不同的 Rabbit 实例(host1 和 host2)就足够了?

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?

推荐答案

什么是最佳方式"的答案在很大程度上取决于您的队列使用模式以及最佳"的含义.由于我还不能对问题发表评论,我将尝试提出一些可能的解决方案.

The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

在每个示例中,我都假设已经声明了 exchange.

In each example I'm going to assume exchange is already declared.

您可以使用 pika.

You can consume messages from two queues on separate hosts in single process using pika.

你是对的 - 正如 它自己的常见问题解答pika 不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用.使用 threading 模块让这个例子在线程中运行如下:

You are right - as its own FAQ states, pika is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading module looks as follows:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

我已将 callback_func 声明为一种纯粹用于在打印消息正文时使用 ConsumerThread.name 的方法.它也可能是 ConsumerThread 类之外的函数.

I've declared callback_func as a method purely to use ConsumerThread.name while printing message body. It might as well be a function outside the ConsumerThread class.

或者,您始终可以在每个要使用事件的队列中使用使用者代码运行一个进程.

Alternatively, you can always just run one process with consumer code per queue you want to consume events.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

然后运行:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

如果您对来自队列的消息所做的工作是CPU-heavy 并且只要您的 CPU 中的内核数量 >= 消费者数量,通常最好使用这种方法 - 除非您的队列大部分时间都是空的并且消费者不会利用此 CPU 时间*.

If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

另一种选择是涉及一些异步框架(例如 Twisted)并运行整个单线程中的东西.

Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

不能再在异步代码中使用BlockingConnection;幸运的是,pika 有用于 Twisted 的适配器:

You can no longer use BlockingConnection in asynchronous code; fortunately, pika has adapter for Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

这种方法会更好,您使用的队列越多,消费者执行的工作对 CPU 的限制就越少*.

This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

既然你提到了 pika,我就限制自己使用基于 Python 2.x 的解决方案,因为 pika 尚未移植.

Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pika is not yet ported.

但如果您想移动到 >=3.3,一种可能的选择是使用 asyncio 使用 AMQP 协议之一(您与 RabbitMQ 使用的协议),例如asynqpaioamqp.

But in case you would want to move to >=3.3, one possible option is to use asyncio with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp or aioamqp.

* - 请注意,这些都是非常浅薄的提示 - 在大多数情况下,选择并不那么明显;什么对你来说是最好的取决于队列的饱和度"(消息/时间),你在收到这些消息后做了什么工作,你在什么环境中运行你的消费者等等;除了对所有实现进行基准测试之外,没有办法确定

* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations

这篇关于Python 和 RabbitMQ - 侦听来自多个渠道的消费事件的最佳方式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆