celery相关内容

芹菜python对象方法

我正在尝试围绕python对象方法获取celery任务包装器.喜欢: A类:@任务def test_task(self,args):打印"BLah..test"def main():a = A()args = {}a.test_task(args) 现在,此操作失败,并显示错误test_task接受至少2个参数(给定1个).我的理解是自我对象没有通过.为什么会这样呢?以及我该如何解决? ..
发布时间:2021-04-21 19:59:12 Python

使用Celery for Python,一旦具有特定任务ID的特定任务成功或失败,是否可以接收通知?

我想知道是否有一种方法可以立即使用python celery监视任务是否完成或失败.我有一个事件要根据特定任务的结果启动. 解决方案 您可以使用芹菜 @shared_task 来运行任务,并在其中使用 try 块,> @shared_taskdef my_task(input1,input2,...):配置...尝试:做东西fire_success_event() ..
发布时间:2021-04-21 19:59:09 Python

Celery Task中使用SocketIO进行服务器推送

我有一个 flask 应用程序,其中有许多长时间运行的异步任务(〜小时).将这些任务的状态与客户端进行通信很重要. 我使用 celery 来管理后台任务队列,并且我目前正尝试通过 socketIO 从每个后台线程向客户端广播更新.这可能吗?有没有更合适的策略可以实现我想要的目标? 解决方案 您没有说,但是我想您打算使用Flask-SocketIO处理服务器端SocketIO,而不是官 ..
发布时间:2021-04-21 19:59:00 Python

同一任务多次执行

我有一些ETA任务,这些任务已发送给Celery的Redis经纪人.这是一个芹菜和Redis实例,都在同一台机器上. 问题是,任务被多次执行.我已经看到任务执行了4到11次. 我将可见性超时设置为12小时,因为我的ETA在4-11小时之间(在运行时确定): BROKER_TRANSPORT_OPTIONS = {'visibility_timeout':12 * 60 * 60} ..
发布时间:2021-04-21 19:58:57 Python

Celery:在关联的主体之后启动和弦回调

当我启动包含一组任务和一个回调的 chord()列表时,仅在完成所有任务组之后,即使不在其中的任务,才调用回调当前的和弦. 下面是用于更好解释的代码: 导入时间从芹菜进口芹菜,组,和弦应用=芹菜('任务')app.config_from_object('celeryconfig')@ app.task(name ='SHORT_TASK')def short_task(t):时间.睡眠(t ..
发布时间:2021-04-21 19:58:54 Python

定期运行Celery任务(没有Django)

我正在尝试使用Celery定期运行一些功能(任务),例如每3秒运行一次. 我最近得到的就是只运行一次任务. 这是我的Celery配置文件: #celeryconfig.py从datetime导入timedeltaBROKER_URL ='amqp://guest @ localhost//'CELERY_RESULT_BACKEND ='rpc://'CELERYBEAT_SCHED ..
发布时间:2021-04-21 19:58:51 Python

启动celery worker并将其启用广播队列

我正在尝试启动celery worker,因此它仅侦听单个队列.这不是问题,我可以这样: python -m celery worker -A my_module -Q my_queue -c 1 但是现在我也希望这个 my_queue 队列成为广播队列,所以我在celeryconfig中做到这一点: 来自kombu.com的 共同导入广播CELERY_QUEUES =(广播('my_ ..
发布时间:2021-04-21 19:58:48 Python

如何将芹菜任务注册到特定工人?

我正在使用Python/Django开发Web应用程序,并且有一些任务正在芹菜中运行. 我必须一次运行一个任务A,所以我使用--concurrency = 1创建了工作程序,并使用以下命令将任务A路由到该工作程序. celery-项目工作者-Q A -c 1 -l INFO 此工作程序处理任务A并将其他任务路由到默认队列后,一切工作正常. 但是,当我使用 inspect 命令获 ..
发布时间:2021-04-21 19:58:42 其他开发

芹菜:通过任务ID获取函数名称?

我正在使用celery on_failure 处理程序,记录所有失败的任务以进行调试和分析.我想知道失败任务的任务名称(函数名称),我该如何获取? 芹菜导入任务中的 类DebugTask(Task):抽象=真def after_return(self,* args,** kwargs):print('任务返回:{0!r}'.format(self.request))def on_failure ..
发布时间:2021-04-21 19:58:39 Python

为什么Celery worker给出"OSError:套接字已关闭"?

我的与RabbitMQ一起工作的芹菜工人在工作了几分钟后,一直给我一个套接字错误-见下文.我想知道问题的主要原因是什么?我以为可能是防火墙.但是,禁用防火墙并不能解决问题.我正在Windows 10计算机上工作. C:\ Users \ user_ \ Desktop \ Aida> celery -A任务工作程序-l info -P eventlet-------------- celery ..
发布时间:2021-04-21 19:58:36 其他开发

在Celery Task Queue中,组中运行的任务与循环中的多个异步程序有什么不同吗?

假设我有一个非常简单的任务,像这样: @ celery.task(ignore_result = True)def print_page(page):与open('path/to/page','w')为f:f.write(页面) (请忽略上面代码中的潜在竞争条件,这是一个简化的示例) 我的问题是,以下两个代码示例是否会产生相同的结果,或者一个示例优于另一个示例: 选择A: ..
发布时间:2021-04-21 19:58:32 Python

根据Celery任务状态更新Django模型字段

在我的模型中,我有一个 status 字段,默认值为'Processing'.在Django管理界面中,用户单击“保存"按钮后,表单输入将传递到仅休眠30秒的芹菜任务. 那30秒之后,我该怎么做: 确定芹菜任务是否成功? 将模型的 status 字段从“处理中"更新为实际状态(例如:完成,失败? models.py django.db导入模型中的 类记分卡(models.M ..
发布时间:2021-04-21 19:58:29 其他开发

Celery-在task.py中导入模型

在访问我的task.py中的模型时遇到问题 我的目标是在应用程序的各个部分(用户注册,重置密码等)发送电子邮件.为此,我将用户ID传递给一个名为"send_email"的芹菜任务. @shared_task()def send_email(sender_id = None,receiver_id = None,type = None,message = None):发件人= User.o ..
发布时间:2021-04-21 19:58:20 Python

让Celery广播所有员工的返回结果

有没有办法从芹菜广播任务中的每个工作人员那里获得所有结果?我想监视所有工人是否一切正常.发送任务的工作人员清单也将不胜感激. 解决方案 否,这不容易实现. 但是您不必局限于内置的amqp结果后端,您可以使用Kombu( http://kombu.readthedocs.org )发送您自己的结果,这是Celery使用的消息传递库: 从芹菜进口芹菜从海带进口交易所results_ex ..
发布时间:2021-04-21 19:58:17 Python

REST API或“直接"远程Celery/Django工作者的数据库访问权限?

我正在做一个项目,该项目将在美国不同位置的机器上有多位芹菜工人,这些机器将通过Internet进行通信. 我最好将Django项目分配到每台计算机上,并使用数据库凭据将它们配置到我的数据库主机上,还是应该有一个“主要" Django/数据库主机来为远程芹菜任务和工作人员提供一个REST API,以便命中数据库访问权限? 主要在寻找优点/缺点以及我没有想到的任何因素. 我可以提供一 ..
发布时间:2021-04-21 19:58:11 其他开发