具有多处理功能的Celery并行分布式任务 [英] Celery parallel distributed task with multiprocessing

查看:103
本文介绍了具有多处理功能的Celery并行分布式任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个CPU密集型Celery任务.我想使用许多EC2实例上的所有处理能力(核心)来更快地完成这项工作(具有多处理功能的芹菜并行分布式任务-我认为).

I have a CPU intensive Celery task. I would like to use all the processing power (cores) across lots of EC2 instances to get this job done faster (a celery parallel distributed task with multiprocessing - I think).

术语线程多处理分布式计算分布式并行处理都是我所用的术语试图更好地理解.

The terms, threading, multiprocessing, distributed computing, distributed parallel processing are all terms I'm trying to understand better.

示例任务:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

使用 上面的代码(如有可能,还提供示例) ,以前如何使用Celery通过允许通过使用所有计算云中所有可用计算机上的CPU功能?

Using the code above (with an example if possible) how one would ago about distributed this task using Celery by allowing this one task to be split up utilising all the computing CPU power across all available machine in the cloud?

推荐答案

您的目标是:

  1. 将您的工作分配到多台机器(分布式 计算/分布式并行处理)
  2. 在所有CPU上分配给定计算机上的工作 (多处理/线程)
  1. Distribute your work to many machines (distributed computing/distributed parallel processing)
  2. Distribute the work on a given machine across all CPUs (multiprocessing/threading)

Celery可以很轻松地为您完成这两项任务.首先要了解的是,每个芹菜工人都是

Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as there are CPU cores available on a system:

并发是用于处理的前叉工作进程数 当所有这些都忙于做新工作时,您可以同时执行任务 任务必须等待其中一项任务完成才可以 被处理.

Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.

默认并发数是该计算机上的CPU数 (包括核心),您可以使用-c选项指定自定义编号. 没有建议值,因为最佳数量取决于 因素的数量,但是如果您的任务主要是受I/O约束的,那么您可以 尝试增加它,实验表明添加 CPU数量的两倍很少有效,并且可能会降级 性能.

The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.

这意味着每个单独的任务都不必担心使用多处理/线程来利用多个CPU/内核.相反,芹菜将同时运行足够的任务以使用每个可用的CPU.

This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.

接下来,下一步是创建一个任务,该任务负责处理list_of_millions_of_ids的某些子集.这里有两个选项-一个是让每个任务处理一个ID,因此您要运行N个任务,其中N == len(list_of_millions_of_ids).这将确保工作在所有任务中平均分配,因为永远不会出现一个工人提前完成而只是在等待的情况.如果需要工作,可以将ID移出队列.您可以使用芹菜group来执行此操作(如John Doe所述).

With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids). This will guarantee that work is evenly distributed amongst all your tasks, since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the a celery group.

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

并执行任务:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

另一种选择是将列表分成较小的部分,然后将这些部分分发给您的工作人员.这种方法冒着浪费一些周期的风险,因为您可能最终会导致一些工人在等待而其他人仍在工作.但是,芹菜文档注释,这种担心通常是没有根据的:

Another option is to break the list into smaller pieces, and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notes that this concern is often unfounded:

有些人可能会担心将任务分块会导致性能下降 并行性,但是对于繁忙的集群和 练习,因为您避免了消息传递的开销,因此可能 大大提高了性能.

Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.

因此,由于减少了消息传递开销,因此您可能会发现对列表进行分块并将分块分配给每个任务的效果更好.您可能还可以通过以下方式来减轻数据库的负担:计算每个id,将其存储在列表中,然后在完成后将整个列表添加到DB中,而不是一次执行一个id .分块方法看起来像这样

So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

并开始执行任务:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

您可以尝试一下分块大小可以为您带来最佳结果的方法.您想找到一个甜蜜的地方,既可以减少消息传递的开销,又要保持足够小的大小,以免最终导致工作人员完成工作块的速度比另一个工作人员快,然后无所事事地等待着.

You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.

这篇关于具有多处理功能的Celery并行分布式任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆