我可以将luigi与Python芹菜一起使用吗 [英] Can i use luigi with Python celery

查看:137
本文介绍了我可以将luigi与Python芹菜一起使用吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将celery用于我的Web应用程序. Celery执行父级任务,然后再执行任务的其他子级

I am using celery for my web application. Celery executes Parent tasks which then executes further pipline of tasks

芹菜的问题

  1. 我无法通过luigi获得依赖图和可视化工具,以查看父任务的状态

  1. I can't get dependency graph and visualizer i get with luigi to see whats the status of my parent task

Celery不提供重新启动失败的管道并从失败的地方开始的机制.

Celery does not provide mechanism to restart the failed pipeline and start from where it failed.

我可以很容易地从luigi那里得到这两件事.

These two thing i can easily get from luigi.

所以我在想,一旦芹菜运行了父任务,然后在该任务中我执行了路易吉·皮普林(Luigi pipleine).

So i was thinking that once celery runs the parent task then inside that task i execute the Luigi pipleine.

是否存在任何问题,即我需要根据queuesize自动缩放芹菜工人.这会影响跨多台机器的任何路易吉工人吗?

Is there going to be any issue with that i.e i need to autoscale the celery workers based on queuesize . will that affect any luigi workers across multiple machines??

推荐答案

从未尝试过,但我认为应该可以在celery任务中调用luigi任务表单,就像您从python代码中通常使用的方法一样:

Never tried but I think it should be possible to call a luigi task form inside a celery task, the same way you do it from python code in general:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

关于扩展队列和celery工人:如果您有太多的celery工人调用luigi任务,当然,这将需要您扩展luigi调度程序/守护程序,以便它可以处理API请求的数量(每次调用任务时)要被执行,您每隔N秒点击一次luigi Scheduler API-这取决于您的配置-每当任务以错误或成功完成时,您的任务都会在Scheduler API上说我还活着"调度程序API等).

About scaling your queue and celery workers: if you have too many celery workers calling luigi tasks of course it will require you to scale your luigi scheduler/daemon so it can handle the number of API requests (every time you call a task to be excecuted, you hit the luigi scheduler API, every N seconds -it dependes on your config- your tasks will hit the scheduler API to say "I'm alive", every time a task finished with -error or success- you hit the scheduler API, and so on).

所以是的,请仔细查看您的调度程序,以查看它是否接收到太多的HTTP请求,或者其数据库是否处于瓶颈(luigi默认使用sqlite,但您可以轻松地将其更改为mysql o postgres).

So yes, take a close look at your scheduler to see if it's receiving too many http requests or if its database is being a bottle neck (luigi uses by default an sqlite but you can easily change it to mysql o postgres).

更新:

版本2.7.0 起,luigi.scheduler.CentralPlannerScheduler重命名为luigi.scheduler.Scheduler的名称为您可能会在这里看到,因此上述代码现在应为:

Since version 2.7.0, luigi.scheduler.CentralPlannerScheduler has been renamed to luigi.scheduler.Scheduler as you may see here so the above code should now be:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

这篇关于我可以将luigi与Python芹菜一起使用吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆