RabbitMQ pika.exceptions.ConnectionClosed [英] RabbitMQ pika.exceptions.ConnectionClosed

查看:26
本文介绍了RabbitMQ pika.exceptions.ConnectionClosed的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用 RabbitMQ 发送消息和接收消息.我没有计算机科学背景,我使用的术语可能不太准确.

I tried to send message and receive message using RabbitMQ. I dont have computer science background, the terms I used could not be very accurate.

我尝试复制教程文件:提交我的 html 表单时,我的 python 脚本 (cgi) 消息正在提交到队列

I try to copy the tutorial file: When submitting my html form, my python script (cgi) the message is submitting to the queue

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='task_queue', durable=True)
        message = PN
        channel.basic_publish(exchange='',
                              routing_key='task_queue',
                              body=message,
                              properties=pika.BasicProperties(
                                 delivery_mode = 2, # make message persistent
                              ))
        connection.close()

我的接收器正在运行:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received Project %r" % body)
    #ch.basic_ack(delivery_tag = method.delivery_tag) 
    if not (os.path.isfile(js_path)):
        print (' [*] ERROR files missing ')
        #ch.basic_ack(delivery_tag = method.delivery_tag)
        return
    p= subprocess.Popen(run a subprocess here)
    p.wait()

    print (' [*] Temporary Files removed')
    print(" [*] Waiting for messages. To exit press CTRL+C")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='task_queue',no_ack=True)
channel.start_consuming()

它大部分时间都可以管理,但随机崩溃并出现以下错误:

It manages most of the time but randomly crash with the following error:

回溯(最近一次调用最后一次):文件Receive5.py",第 139 行,在channel.start_sumption() 文件C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py",第 1681 行,在 start_sumption 中self.connection.process_data_events(time_limit=None) 文件C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py",第 647 行,在 process_data_events 中self._flush_output(common_terminator) 文件C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py",第 426 行,在 _flush_output 中引发 exceptions.ConnectionClosed() pika.exceptions.ConnectionClosed

Traceback (most recent call last): File "Receive5.py", line 139, in channel.start_consuming() File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 1681, in start_consuming self.connection.process_data_events(time_limit=None) File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 647, in process_data_events self._flush_output(common_terminator) File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 426, in _flush_output raise exceptions.ConnectionClosed() pika.exceptions.ConnectionClosed

推荐答案

这是因为你让主线程一直在等待,因此 pika 无法处理传入的消息;在这种情况下,它无法响应心跳,直到子进程完成.这会导致 RabbitMQ 认为客户端已死并强制断开连接.

This is because you are keeping the main thread waiting, and because of this pika cannot handle incoming messages; in this case it cannot respond to the heartbeat until the subprocess is done. This causes RabbitMQ to think that the client is dead and forces a disconnection.

如果您希望它与心跳一起工作(推荐),您需要定期调用 connection.process_data_events.这可以通过添加一个循环来检查线程是否完成,并且每 30 秒左右调用一次 process_data_events 直到线程完成.

If you want this to work with heartbeats (which is recommend) you need to periodically call connection.process_data_events. This can be done by adding a loop that checks if the thread is done, and every 30s or so call process_data_events until the thread is done.

这篇关于RabbitMQ pika.exceptions.ConnectionClosed的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆