在 pika/RabbitMQ 中处理长时间运行的任务 [英] Handling long running tasks in pika / RabbitMQ

查看:51
本文介绍了在 pika/RabbitMQ 中处理长时间运行的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试建立一个基本的定向队列系统,其中一个生产者将生成多个任务,一个或多个消费者将一次获取一个任务、处理它并确认消息.

We're trying to set up a basic directed queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.

问题是,处理可能需要 10-20 分钟,而且我们当时没有响应消息,导致服务器与我们断开连接.

The problem is, the processing can take 10-20 minutes, and we're not responding to messages at that time, causing the server to disconnect us.

这是我们的消费者的一些伪代码:

Here's some pseudo code for our consumer:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

第一个任务完成后,在 BlockingConnection 深处的某个地方抛出异常,抱怨套接字已重置.此外,RabbitMQ 日志显示消费者因未及时响应而断开连接(为什么它重置连接而不是发送 FIN 很奇怪,但我们不会担心).

After the first task completes, an exception is thrown somewhere deep inside of BlockingConnection, complaining that the socket was reset. In addition, the RabbitMQ logs show that the consumer was disconnected for not responding in time (why it resets the connection rather than sending a FIN is strange, but we won't worry about that).

我们搜索了很多,因为我们认为这是 RabbitMQ 的正常用例(有很多长时间运行的任务应该分配给许多消费者),但似乎没有其他人真正遇到过这个问题.最后,我们偶然发现了一个线程,建议使用心跳并在单独的线程中生成 long_running_task().

We searched around a lot because we believed this was the normal use case for RabbitMQ (having a lot of long running tasks that should be split up among many consumers), but it seems like nobody else really had this issue. Finally we stumbled upon a thread where it was recommended to use heartbeats and to spawn the long_running_task() in a separate thread.

于是代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这似乎有效,但非常混乱.我们确定 ch 对象是线程安全的吗?此外,想象一下 long_running_task() 正在使用该连接参数将一个任务添加到一个新队列(即这个漫长过程的第一部分完成,让我们将任务发送到第二部分).因此,线程正在使用 connection 对象.那个线程安全吗?

And this seems to work, but it's very messy. Are we sure that the ch object is thread safe? In addition, imagine that long_running_task() is using that connection parameter to add a task to a new queue (i.e. the first part of this long process is done, let's send the task on to the second part). So, the thread is using the connection object. Is that thread safe?

更重要的是,这样做的首选方式是什么?我觉得这很混乱,可能不是线程安全的,所以也许我们做得不对.谢谢!

More to the point, what's the preferred way of doing this? I feel like this is very messy and possibly not thread safe, so maybe we're not doing it right. Thanks!

推荐答案

目前,最好的办法是关闭心跳,如果阻塞时间过长,这将阻止 RabbitMQ 关闭连接.我正在尝试在后台线程中运行 pika 的核心连接管理和 IO 循环,但它不够稳定,无法发布.

For now, your best bet is to turn off heartbeats, this will keep RabbitMQ from closing the connection if you're blocking for too long. I am experimenting with pika's core connection management and IO loop running in a background thread but it's not stable enough to release.

pika v1.1.0 中,这是 ConnectionParameters(heartbeat=0)

这篇关于在 pika/RabbitMQ 中处理长时间运行的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆