对使用 concurrent.futures.ProcessPoolExecutor 动态创建的函数的限制 [英] Restrictions on dynamically created functions with concurrent.futures.ProcessPoolExecutor
问题描述
我正在尝试使用我在其他函数中动态创建的函数进行一些多处理.如果提供给 ProcessPoolExecutor 的函数是模块级的,我似乎可以运行这些:
I am trying to do some multiprocessing with functions that I dynamically create within other functions. It seems I can run these if the function fed to ProcessPoolExecutor is module-level:
def make_func(a):
def dynamic_func(i):
return i, i**2 + a
return dynamic_func
f_dyns = [make_func(a) for a in range(10)]
def loopfunc(i):
return f_dyns[i](i)
with concurrent.futures.ProcessPoolExecutor(3) as executor:
for i,r in executor.map(loopfunc, range(10)):
print(i,":",r)
输出:
0 : 0
1 : 2
2 : 6
3 : 12
4 : 20
5 : 30
6 : 42
7 : 56
8 : 72
9 : 90
但是,如果多处理是由类函数启动的,我就做不到:
However I can't do it if the multiprocessing is launched by a class function:
class Test:
def __init__(self,myfunc):
self.f = myfunc
def loopfunc(self,i):
return self.f(i)
def run(self):
with concurrent.futures.ProcessPoolExecutor(3) as executor:
for i,r in executor.map(self.loopfunc, range(10)):
print(i,":",r)
o2 = Test(make_func(1))
o2.run()
输出:
Traceback (most recent call last):
File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_func.<locals>.dynamic_func'
另一方面,如果我不使用动态生成的函数,我可以在类函数上运行多处理就好了.有没有办法解决这个问题?我尝试将动态生成的函数添加到全局"字典中,但这似乎没有帮助:
On the other hand, I can run the multiprocessing on a class function just fine if I don't use a dynamically generated function in there. Is there some way around this? I tried adding the dynamically generated function to the 'globals' dictionary but that didn't seem to help:
def make_func_glob(a):
def dynamic_func(i):
return i, i**2 + a
globals()['my_func_{0}'.format(a)] = dynamic_func
make_func_glob(1)
print("test:", my_func_1(3))
o3 = Test(my_func_1)
o3.run()
输出:
test: (3, 10)
Traceback (most recent call last):
File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_func_glob.<locals>.dynamic_func'
所以即使我将它添加到全局"字典中,python 仍然认为它是一个本地对象.像这种全球"的想法会很好,我不需要任何花哨的东西.为了方便起见,我只是动态地创建这些函数.如果它们成为全局对象,我会非常高兴.它们将始终由模块定义,只有一堆具有几乎相同的定义,因此以编程方式定义它们比手动将它们全部写出来更方便.所以我认为有可能以某种方式让python将它们识别为真正的"函数,就像我通过'exec'定义它们一样.或者至少足够接近我可以在我的并行代码中使用它们.
So python still thinks it is a local object even though I added it to the 'globals' dict. Something like this 'globals' idea would be fine, I don't need anything fancy. I am only dynamically creating these functions for the sake of convenience. I would be perfectly happy for them to be global objects. They will always be defined by the module, there are just a bunch of them with almost the same definition so it is more convenient to define them programmatically rather than writing them all out manually. So I would have thought it was possible to somehow get python to recognise them as "true" functions, like if I defined them via 'exec'. Or at least close enough that I could use them in my parallelised code.
推荐答案
正如错误消息所暗示的那样,它更多地与酸洗而不是动态生成的函数有关.来自 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
As the error messages suggest, it's more to do with the pickling rather than dynamically generated functions. From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
只能执行和返回picklable对象.
only picklable objects can be executed and returned.
来自https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled,可以腌制的函数种类:
and from https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled, the sorts of functions which can be pickled:
在模块顶层定义的函数(使用 def,而不是 lambda)
functions defined at the top level of a module (using def, not lambda)
这表明其他类型的函数不能被腌制.从问题中的代码中跳出一个不符合此要求的函数: dynamic_func
from...
which suggests other sorts of functions can't be pickled. From the code in the question a function that doesn't adhere to this leaps out: dynamic_func
from...
def make_func(a):
def dynamic_func(i):
return i, i**2 + a
return dynamic_func
...你暗示这是问题....
...and you hint that this is the issue....
所以我认为有可能以某种方式让 python 将它们识别为真正的"函数
So I would have thought it was possible to somehow get python to recognise them as "true" functions
你可以!您可以将 dynamic_func
放在顶层,并使用 partial
而不是闭包...
You can! You can put dynamic_func
on the top level, and use partial
rather than a closure...
from functools import partial
def dynamic_func(a, i):
return i, i**2 + a
def make_func(a):
return partial(dynamic_func, a)
那么完整...
import concurrent.futures
from functools import partial
def dynamic_func(a, i):
return i, i**2 + a
def make_func(a):
return partial(dynamic_func, a)
f_dyns = [make_func(a) for a in range(10)]
def loopfunc(i):
return f_dyns[i](i)
class Test:
def __init__(self, myfunc):
self.f = myfunc
def loopfunc(self, i):
return self.f(i)
def run(self):
with concurrent.futures.ProcessPoolExecutor(3) as executor:
for i,r in executor.map(self.loopfunc, range(10)):
print(i,":",r)
o2 = Test(make_func(1))
o2.run()
但是...为什么没有类的原始表单有效,我不知道.根据我的理解,它会试图腌制一个非顶级函数,所以我认为我的理解是有缺陷的.
But... why the original form without classes worked, I don't know. By my understanding, it would be trying to pickle a non-top level function, and so I think my understanding is flawed.
这篇关于对使用 concurrent.futures.ProcessPoolExecutor 动态创建的函数的限制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!