芹菜:守护进程不允许有孩子 [英] celery: daemonic processes are not allowed to have children

查看:401
本文介绍了芹菜:守护进程不允许有孩子的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Python(2.7)中,我尝试在celery任务(celery 3.1.17)中创建进程(带有多处理),但它给出了错误:

In Python (2.7) I try to create processes (with multiprocessing) in a celery task (celery 3.1.17) but it gives the error:

daemonic processes are not allowed to have children

使用Google搜索,我发现台球的最新版本修复了错误,但我安装了最新版本(3.3.0.20),并且该错误仍在发生。我还尝试在我的celery任务中实现此替代方法,但它给出了相同的错误。

Googling it, I found that most recent versions of billiard fix the "bug" but I have the most recent version (3.3.0.20) and the error is still happening. I also tried to implement this workaround in my celery task but it gives the same error.

有人知道怎么做吗?
感谢您的帮助,
Patrick

Does anybody know how to do it? Any help is appreciated, Patrick

编辑:代码段

任务:

from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder

@shared_task
def embedder_update_task(embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.test()

人工测试功能(从此处):

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = mp.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test(self):
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

我的 real 函数:

import mulitprocessing as mp

def test(self):
    self.init()
    for saveindex in range(self.start_index,self.start_index+self.nsaves):
        self.create_storage(saveindex)
        # process creation:
        procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)]
        for p in procs: p.start()
        for p in procs: p.join()
    print "End of task"

init函数定义了一个多处理数组, d一个共享相同内存的对象,以便我所有的进程都可以同时更新同一数组:

The init function defines a multiprocessing array and an object that share the same memory so that all my processes can update this same array at the same time:

mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V

调用任务时生成错误:

[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process.
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]     raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
   R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
   return self.run(*args, **kwargs)
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task
    embedder.test()
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test
    pool = MyPool(5)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
    w.start()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children


推荐答案

台球多处理是不同的库-台球是Celery项目自己的 multiprocessing 分叉。您将需要导入台球并使用它来代替 multiprocessing

billiard and multiprocessing are different libraries - billiard is the Celery project's own fork of multiprocessing. You will need to import billiard and use it instead of multiprocessing

但是,更好的答案可能是您应该重构代码,以便产生更多的Celery任务,而不是使用两种不同的方式分配工作。

However the better answer is probably that you should refactor your code so that you spawn more Celery tasks instead of using two different ways of distributing your work.

您可以使用Celery 画布

You can do this using Celery canvas

from celery import group

@app.task
def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)])

def test(self):
    my_group = group(work(randint(1, 5)) for x in range(5))
    result = my_group.apply_async()
    result.get()

我试图制作使用canvas primit的代码的有效版本ives而不是多重处理。但是,由于您的示例是虚构的,因此提出有意义的内容并不容易。

I've attempted to make a working version of your code that uses canvas primitives instead of multiprocessing. However since your example was quite artificial it's not easy to come up with something that makes sense.

更新:

以下是使用Celery画布的真实代码的翻译:

Here is a translation of your real code that uses Celery canvas:

tasks.py

@shared_task
run_training_method(saveindex, embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.training_method(saveindex)

models.py

from tasks import run_training_method
from celery import group

class Embedder(Model):

    def embedder_update_task(self):
        my_group = []

        for saveindex in range(self.start_index, self.start_index + self.nsaves):
            self.create_storage(saveindex)
            # Add to list
            my_group.extend([run_training_method.subtask((saveindex, self.id)) 
                         for i in range(self.nproc)])

        result = group(my_group).apply_async()

这篇关于芹菜:守护进程不允许有孩子的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆