将链条,组和块与芹菜结合 [英] Combining chains, groups and chunks with Celery

查看:38
本文介绍了将链条,组和块与芹菜结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将Celery用于网址抓取器.

I want to use Celery for a Url grabber.

我有一个Url列表,我必须对每个URL进行HTTP请求,并将结果写到一个文件中(整个列表使用相同的文件).

I have a list of Url, and I must do a HTTP request on every URL and write the result in a file (same file for the whole list).

我的第一个想法是在每 n 分钟一次的Celery beat调用的任务中编写此代码:

My first idea was to make this code in the task which is called by Celery beat every n minutes :

@app.task
def get_urls(self):
    results = [get_url_content.si(
        url=url
    ) for url in urls]

    ch = chain(
        group(*results),
        write_result_on_disk.s()
    )

    return ch()

此代码运行得很好,但是有1个问题:我要抓取一千个URL,如果其中1个get_url_content失败,则不会调用write_result_on_disk,并且我们将丢失所有先前抓取的内容.

This code works pretty well, but there is 1 problem : I have a thousand of URL to grab, if 1 of the get_url_content fails, the write_result_on_disk is not called and we lose all the previous grabbed contents.

我想做的是通过拆分URL来压缩任务,获取其结果并将其写入磁盘.例如,将20个url的内容写在磁盘上.

What I want to do is to chunk the tasks by splitting the URLs, grab their result and write it on disk. For example the contents of 20 urls are written on disk.

请问您有个主意吗?我尝试了 chunks()函数,但没有得到真正有用的结果.

Do you have an idea please ? I tried the chunks() function but did not got really useful results.

推荐答案

使用CeleryBeat进行类似cron的任务是一个好主意.

Using CeleryBeat for cron-like tasks is a good idea.

我会尝试在您的 get_url_content 微任务中捕获异常.当您抓住它们时,只需返回其他东西即可.这样,您可以在summary_task中对其进行评估(例如计数,列出,检查).

I would try to catch exceptions in your get_url_content micro-tasks. Just return something else when you catch them. This way, you can evaluate (e.g. count, list, inspect) them in a summarize_task.

如何将和如何将链接到另一个任务:

How to use chunks and chain chunks with another task:

步骤1:将块转换为组:

http://docs.celeryproject.org/中所述zh_CN/latest/userguide/canvas.html#chunks .group()将类型为 celery.canvas.chunks 的对象转换为一个组,即在Celery中更常见的类型.

As described in http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks, .group() transforms an object of type celery.canvas.chunks into a group, which is a much more common type in Celery.

第2步:链接组和任务

http"://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives 提到:

将一个群组与另一个任务捆绑在一起将自动升级成为和弦

Chaining a group together with another task will automatically upgrade it to be a chord


以下是这两个任务的代码以及我通常的称呼方式:


Here is some code with the two tasks and how I usually call them:

@app.task
def solve_micro_task(arg: str) -> dict:
    ...

@app.task
def summarize(items: List[List[dict]]):
    flat_list = [item for sublist in items for item in sublist]
    for report in flat_list:
        ...

chunk_tasks = solve_micro_task.chunks(<your iterator, e.g., a list>), 10)  # type: celery.canvas.chunks
summarize_task = summarize.s()
chain(chunk_tasks.group(), summarize_task)()

这篇关于将链条,组和块与芹菜结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆