pika相关内容

RabbitMQ 中的密钥感知消费者

让我们考虑一个系统,其中将数千个客户端数据发布到 RabbitMQ 交换(在此阶段 client_id 已知).Exchange 将它们路由到单个队列.最后,消息由单个应用程序使用.效果很好. 然而,随着时间的推移,消费应用程序成为瓶颈,需要水平扩展.问题是系统要求考虑特定客户端的消息由应用程序的同一实例使用. 我可以创建很多队列:每个客户端一个,或者使用主题交换并根据某些 clien ..
发布时间:2021-07-02 18:39:49 其他开发

rabbitmq 中的多个消费者用于多个队列

我有 2 个队列,比如说 q1 和 q2,它们对应于 e1 和 e2 交换,绑定密钥 b1 和 b2.我想并行运行消费者函数,比如 c1 和 c2,它们将分别监听 q1 和 q2.我尝试了以下方法: def c1():connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))通 ..
发布时间:2021-07-02 18:39:27 Python

鼠兔:写入缓冲区超出警告

我们的软件程序运行良好 5 个月,现在突然我们开始收到 Pika 警告,最终导致异常. Pika 0.9.5 UserWarning:写入缓冲区超出警告阈值. 我搜索了很多论坛,但都不太满意.描述的一种解决方案是完全忽略这些警告,但我对此有点怀疑.在这方面的任何帮助将不胜感激.很紧急. 谢谢 解决方案 这个:http://lists.rabbitmq.com/piperm ..
发布时间:2021-07-02 18:38:16 Python

Python Pika - 消费者进入线程

我正在开发一个带有后台线程的 Python 应用程序,用于使用来自 RabbitMQ 队列的消息(主题场景). 我在按钮的 on_click 事件上启动线程.这是我的代码,请注意“#self.receive_command()". def on_click_start_call(self,widget):t_msg = threading.Thread(target=self.receiv ..
发布时间:2021-07-02 18:36:05 Python

为 Pika ioloop async (RabbitMQ) 设置超时

我需要能够优雅地停止在 Pika ioloop 中工作的消费者(工人).工作人员应在 60 秒后停止.当前处理的消息应该已完成. 我试图在回调函数中放入一个 connection.close() 但这只会停止当前线程而不是完整的 ioloop.它给出了一个可怕的错误输出. 请参阅我的代码中的第 16 行及以下内容:我使用了(关于 Pika ioloop 的基本示例 http://pik ..
发布时间:2021-07-02 18:35:47 Python

如何在python中使用pika(RabbitMQ)向消费者添加多处理

我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行速度太慢.我进行了一些测试,发现通过多处理可以将工作流程加快多达 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么. 导入鼠兔导入json从日期时间导入日期时间从函数导入download_xmlsdef回调(ch,方法,属性,主体):print('有东西') ..
发布时间:2021-07-02 18:35:05 Python

如何在python中做一个简单的Pika SelectConnection来发送消息?

我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息.我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦. 在我使用 amqp 库的旧代码中,我只是声明了一个这样的类: 将 amqp 导入为 amqp类 MQ():mqConn = 无频道 = 无def __init__(self):self.connect()定义连接(自我): ..
发布时间:2021-07-02 18:33:56 Python

在 python/pika 中使用多个队列

我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时对其进行处理. 问题在于,当第一个队列中已经存在一些数据时,它会消耗第一个队列,而永远不会去消耗第二个队列.但是,当第一个队列为空时,它确实会转到下一个队列,然后同时消耗两个队列. 我首先实现了线程,但想避开它,当 pika 库为我完成它时没有太多复杂性.下面是我的代码: 导入鼠兔mq_connection = pika.Blo ..
发布时间:2021-07-02 18:33:35 Python

与鼠兔使用哪种连接形式

我一直试图弄清楚在使用鼠兔时应该使用哪种连接形式,据我所知,我有两种选择. BlockingConnection 或 SelectConnection,但是我不太确定这两者之间的区别(即 BlockingConnection 阻塞是什么?等等) pika 的文档说SelectConnection 是连接rabbit 的首选方式,因为它提供了“多种事件通知方法,包括select、epoll、 ..
发布时间:2021-07-02 18:33:20 Python

Pika BlockingConnection &RabbitMQ:连接已关闭

我在由 Azure 内部负载平衡器平衡的集群负载中有 2 个 rabbitmq.客户端使用 BlockingConnection 连接到 LB. 当客户端交换消息时,一切正常.但是当没有活动时,我的客户端似乎已断开连接并且无法再接收消息. 我想知道这个问题是否有解决方案?我假设负载均衡器或 rabbitmq 由于不活动而关闭连接.我想让 pika 触发一些对 rabbitmq 的心跳( ..
发布时间:2021-07-02 18:32:54 Python

Pika blocks_connection.py 随机超时连接到RabbitMQ

我有一个兔子 mq 在机器上运行 客户端和rabbitMQ都在同一个网络上运行 rabbitMQ 有很多客户端 我可以从rabbitMQ ping 客户端并返回 在机器之间测得的最长延迟为 12.1 毫秒 网络详细信息:标准交换机网络(在单个物理机上运行的虚拟机网络 - 使用 vmware VC) 初始化 RPC 连接时随机超时 /usr/lib/python2.6/si ..
发布时间:2021-07-02 18:32:02 Python

RabbitMQ pika.exceptions.ConnectionClosed

我尝试使用 RabbitMQ 发送消息和接收消息.我没有计算机科学背景,我使用的术语可能不太准确. 我尝试复制教程文件:提交我的 html 表单时,我的 python 脚本 (cgi) 消息正在提交到队列 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))通道 = connect ..
发布时间:2021-07-02 18:31:36 Python

使用 ExternalCredentials 对 rabbitmq 进行身份验证

我有一个 rabbitmq 服务器并使用带有 Python 的 pika 库来生成/使用消息.出于开发目的,我只是使用 credentials = pika.PlainCredentials(, ) 我想将其更改为使用 pika.ExternalCredentials 或 TLS. 我已设置我的 rabbitmq 服务器以在端口 5671 上侦听 TLS,并已正确配置它.我能够从 ..
发布时间:2021-07-02 18:31:07 Python

在 pika/RabbitMQ 中处理长时间运行的任务

我们正在尝试建立一个基本的定向队列系统,其中一个生产者将生成多个任务,一个或多个消费者将一次获取一个任务、处理它并确认消息. 问题是,处理可能需要 10-20 分钟,而且我们当时没有响应消息,导致服务器与我们断开连接. 这是我们的消费者的一些伪代码: #!/usr/bin/env python进口鼠兔导入时间connection = pika.BlockingConnection(p ..
发布时间:2021-07-02 18:30:25 其他开发

在不禁用心跳的情况下保持鼠兔 BlockingConnection 处于活动状态

我正在使用 pika 0.10.0 和 python 2.7 版本开发 RabbitMQ 消费者.在我的消费者客户端中,我有一个进程根据输入消息运行一段时间.它可以从 3 到 40 分钟不等.我不想禁用心跳.相反,我正在寻找一些可以使连接保持活动状态的回退机制,直到将 delivery_tag 发回.这可能吗? 我得到的链接很少,所有人都建议禁用心跳作为解决方法.但我不想禁用它. 参考 ..
发布时间:2021-06-26 19:49:48 其他开发

rabbitmq 使用带有 pika 的线程

我正在尝试使用rabbitmq 获得一个基本的队列系统,但是当我尝试使用线程时,它似乎只运行了 1 个线程. 我的代码: 导入鼠兔进口螺纹rabbit_url = "amqp://user:pass!@127.0.0.1:5672/%2f"定义开始(max_threads):对于 xrange(max_threads) 中的 i:t = threading.Thread(目标=运行)t.s ..
发布时间:2021-06-04 20:30:19 Python