django /芹菜:在150k Django对象上运行任务的最佳做法? [英] django/celery: Best practices to run tasks on 150k Django objects?
问题描述
我还会考虑使用除了使用数据库作为代理之外的其他东西。这真的不适合这种工作。
尽管如此,您可以通过启动任务来创建请求/响应周期中的某些开销其他任务:
from celery.task import TaskSet,task
from myapp.models import MyModel
@task
def process_object(pk):
obj = MyModel.objects.get(pk)
#用obj
@task
def process_lots_of_items(ids_to_process):
返回TaskSet(process_object.subtask((id,))
在ids_to_process中的id).apply_async()
此外,由于您可能没有15000个处理器并行处理所有这些对象
,您可以将对象分割说100或1000的块:
from itertools import islice
pre>
from celery.task import TaskSet,task
from myapp.models import MyModel
def chunks(it,n):
for first in它:
yield [first] + list(islice(it,n - 1))
@task
def process_chunk(pks):
objs = MyModel。 object.filter(pk__in = pks)
obj中的obj:
#做一些obj
@task
def process_lots_of_items(ids_to_process):
返回TaskSet(process_chunk.subtask((chunk,))
块中的块(iter(ids_to_process),
1000))。apply_async()
I have to run tasks on approximately 150k Django objects. What is the best way to do this? I am using the Django ORM as the Broker. The database backend is MySQL and chokes and dies during the task.delay() of all the tasks. Related, I was also wanting to kick this off from the submission of a form, but the resulting request produced a very long response time that timed out.
解决方案I would also consider using something other than using the database as the "broker". It really isn't suitable for this kind of work.
Though, you can move some of this overhead out of the request/response cycle by launching a task to create the other tasks:
from celery.task import TaskSet, task from myapp.models import MyModel @task def process_object(pk): obj = MyModel.objects.get(pk) # do something with obj @task def process_lots_of_items(ids_to_process): return TaskSet(process_object.subtask((id, )) for id in ids_to_process).apply_async()
Also, since you probably don't have 15000 processors to process all of these objects in parallel, you could split the objects in chunks of say 100's or 1000's:
from itertools import islice from celery.task import TaskSet, task from myapp.models import MyModel def chunks(it, n): for first in it: yield [first] + list(islice(it, n - 1)) @task def process_chunk(pks): objs = MyModel.objects.filter(pk__in=pks) for obj in objs: # do something with obj @task def process_lots_of_items(ids_to_process): return TaskSet(process_chunk.subtask((chunk, )) for chunk in chunks(iter(ids_to_process), 1000)).apply_async()
这篇关于django /芹菜:在150k Django对象上运行任务的最佳做法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!